API Reference#

Overview#

The SYMFLUENCE Python API provides programmatic access to the full workflow, from project setup through calibration and analysis. The primary entry point is the SYMFLUENCE class, which coordinates manager components through the WorkflowOrchestrator.

Quick Start#

Basic Usage#

from symfluence import SYMFLUENCE

# Initialize from configuration file
conf = SYMFLUENCE("my_project.yaml")

# Run complete workflow
conf.run_workflow()

# Or run individual steps
conf.setup_project()
conf.define_domain()
conf.acquire_forcings()
conf.run_models()

Step-by-Step Execution#

from symfluence import SYMFLUENCE

# Initialize
conf = SYMFLUENCE("config.yaml")

# 1. Project Setup
conf.setup_project()
conf.create_pour_point()

# 2. Domain Definition
conf.acquire_attributes()
conf.define_domain()
conf.discretize_domain()

# 3. Data Acquisition
conf.process_observed_data()
conf.acquire_forcings()
conf.run_model_agnostic_preprocessing()

# 4. Model Execution
conf.preprocess_models()
conf.run_models()
conf.postprocess_results()

# 5. Calibration (optional)
conf.calibrate_model()

# 6. Analysis
conf.run_benchmarking()
conf.run_sensitivity_analysis()

Configuration Access#

from symfluence import SYMFLUENCE
from symfluence.core.config import SymfluenceConfig

# Load typed configuration directly
config = SymfluenceConfig.from_file("config.yaml")

# Access typed attributes
print(f"Domain: {config.domain.name}")
print(f"Model: {config.model.hydrological_model}")
print(f"Start: {config.experiment.time_start}")

# Initialize SYMFLUENCE with typed config
conf = SYMFLUENCE(config)

Core API#

SYMFLUENCE (Main Class)#

The primary interface for all SYMFLUENCE operations.

class symfluence.core.system.SYMFLUENCE(config_input, config_overrides=None, debug_mode=False, visualize=False, diagnostic=False)[source]#

Bases: object

Enhanced SYMFLUENCE main class with comprehensive CLI support.

This class serves as the central coordinator for all SYMFLUENCE operations, with enhanced CLI capabilities including individual step execution, pour point setup, SLURM job submission, and comprehensive workflow management.

Parameters:
  • config_input (Path | str | SymfluenceConfig)

  • config_overrides (Dict[str, Any])

  • debug_mode (bool)

  • visualize (bool)

  • diagnostic (bool)

__init__(config_input, config_overrides=None, debug_mode=False, visualize=False, diagnostic=False)[source]#

Initialize the SYMFLUENCE system with configuration and CLI options.

Parameters:
  • config_input (Path | str | SymfluenceConfig) – Path to the configuration file or a SymfluenceConfig instance

  • config_overrides (Dict[str, Any]) – Dictionary of configuration overrides from CLI

  • debug_mode (bool) – Whether to enable debug mode

  • visualize (bool) – Whether to enable visualization

  • diagnostic (bool) – Whether to enable diagnostic plots for workflow validation

run_workflow(force_run=None)[source]#

Execute the complete SYMFLUENCE workflow (CLI wrapper).

Parameters:

force_run (bool | None)

Return type:

None

run_individual_steps(step_names)[source]#

Execute specific workflow steps by name.

Allows selective execution of individual workflow steps rather than running the complete pipeline. Useful for debugging, testing, or re-running specific portions of the workflow.

Parameters:

step_names (List[str]) – List of step names to execute (e.g., [‘setup_project’, ‘calibrate_model’])

Return type:

None

get_workflow_status()[source]#

Return workflow completion status from the orchestrator.

Returns:

Workflow status payload with step_details and counts.

Return type:

Dict[str, Any]

run_diagnostics_for_step(step_name)[source]#

Run diagnostic plots for a specific workflow step on existing outputs.

Parameters:

step_name (str) – Name of the workflow step to diagnose

Returns:

List of paths to generated diagnostic plots

Return type:

List[str]

run_all_diagnostics()[source]#

Run all available diagnostic plots on existing workflow outputs.

Returns:

List of paths to generated diagnostic plots

Return type:

List[str]

Initialization:

from symfluence import SYMFLUENCE

# From YAML file path
conf = SYMFLUENCE("path/to/config.yaml")

# From SymfluenceConfig object
from symfluence.core.config import SymfluenceConfig
config = SymfluenceConfig.from_file("config.yaml")
conf = SYMFLUENCE(config)

# From dictionary
config_dict = {"DOMAIN_NAME": "test", ...}
conf = SYMFLUENCE(config_dict)

Core Methods:

# Full workflow execution
conf.run_workflow(force_run=False)

# Workflow status
status = conf.get_workflow_status()
# Returns: {
#   "total_steps": 15,
#   "completed_steps": 8,
#   "pending_steps": 7,
#   "step_details": [...]
# }

Manager Classes#

SYMFLUENCE uses a manager-based architecture where each major subsystem has a dedicated manager class.

Project Manager#

Handles project initialization and structure.

Project management for SYMFLUENCE hydrological modeling setups.

Handles project directory structure creation, pour point generation, and project metadata management for hydrological model domains.

class symfluence.project.project_manager.ProjectManager(config, logger)[source]#

Bases: ConfigurableMixin

Manages project-level operations including directory structure and initialization.

The ProjectManager is responsible for creating and managing the project directory structure, handling pour point creation, and maintaining project metadata. It serves as the foundation for all other SYMFLUENCE components by establishing the physical file organization that the workflow depends on.

Key responsibilities: - Creating the project directory structure - Generating pour point shapefiles from coordinates - Validating project structure integrity - Providing project metadata to other components

Parameters:
config#

Configuration dictionary containing project settings

Type:

Dict[str, Any]

logger#

Logger instance for recording operations

Type:

logging.Logger

__init__(config, logger)[source]#

Initialize the ProjectManager.

Parameters:
  • config (SymfluenceConfig) – SymfluenceConfig instance

  • logger (Logger) – Logger instance

Raises:

TypeError – If config is not a SymfluenceConfig instance

setup_project()[source]#

Set up the project directory structure.

Creates the main project directory and all required subdirectories. Project layout (canonical, post-2026):

{project_dir}/

shapefiles/{pour_point, catchment, river_network, river_basins}/ data/

attributes/ <- DEM, soil, landclass, etc. forcing/ <- raw + merged + basin-averaged forcing observations/ <- streamflow, snotel, etc. model_ready/ <- model-agnostic store

The data/ prefix is created up-front so that resolve_data_subdir consistently resolves to the new layout for fresh projects. Without this, setup_project used to create the legacy attributes/ directory directly, which made resolve_data_subdir pick the legacy path on subsequent reads. Some downstream callers (e.g. TauDEM in define_domain) construct the path from string templates anchored at data/attributes/... and then fail to find the DEM that was written into the legacy attributes/... tree.

Legacy projects with a pre-existing attributes/ directory continue to work via the backward-compat branch in resolve_data_subdir — this change only affects fresh setup_project runs.

Returns:

Path to the created project directory

Return type:

Path

Raises:

OSError – If directory creation fails due to permission or disk space issues

create_pour_point()[source]#

Create the outlet (pour point) shapefile from coordinates if specified.

Writes a GeoDataFrame of one or more point geometries and saves it as a shapefile. POUR_POINT_COORDS is the primary, most-downstream outlet that defines the domain extent (id 0). Any POUR_POINT_ADDITIONAL_COORDS are written as additional interior outlets (id 1..N) so TauDEM breaks the stream network at each, forcing subbasin/GRU boundaries to align with interior gauges. Additional outlets only apply to semidistributed / distributed delineation; they are ignored (with a warning) for other methods, where a single basin is delineated. If POUR_POINT_COORDS is ‘default’, assumes a user-provided pour point shapefile exists.

Returns:

Path to the created pour point shapefile if successful,

None if using a user-provided shapefile or if creation fails

Return type:

Optional[Path]

Raises:
  • ValueError – If the pour point coordinates are in an invalid format

  • Exception – For other errors during shapefile creation

get_project_info()[source]#

Get information about the project configuration.

Collects key project metadata into a dictionary for reporting, logging, or providing status information to other components.

The returned information includes: - Domain name - Experiment ID - Project directory path - Data directory path - Pour point coordinates

Returns:

Dictionary containing project information

Return type:

Dict[str, Any]

Key Methods:

from symfluence.project.project_manager import ProjectManager

pm = ProjectManager(config, logger)

# Setup project directory structure
pm.setup_project()

# Create pour point from coordinates
pm.create_pour_point()

# Get project information
info = pm.get_project_info()

Domain Manager#

Manages domain definition and discretization.

Domain management facade for SYMFLUENCE geospatial operations.

Coordinates domain definition, delineation, and discretization workflows with integrated visualization and artifact tracking.

class symfluence.geospatial.domain_manager.DomainManager(config, logger, reporting_manager=None)[source]#

Bases: ConfigurableMixin

Orchestrates all geospatial domain operations for hydrological modeling setup.

This manager coordinates domain definition (delineation), spatial discretization, and artifact tracking for hydrological modeling workflows. It provides a unified interface for creating HRU (Hydrologic Response Unit) configurations from various spatial data sources and discretization strategies.

Architecture:

Facade Pattern - Coordinates specialized geospatial services: - DomainDelineator: Watershed boundary extraction and network topology - DomainDiscretizationRunner: HRU creation via spatial disaggregation - Artifact Tracking: Maintains references to all created shapefiles - Visualization Integration: Optional reporting for QA/QC

Domain Definition Methods:
point:
  • Creates square bounding box domain from coordinates

  • Use case: FLUXNET sites, point-scale modeling

  • Output: Single polygon shapefile

lumped:
  • Single-basin watershed delineation from pour point

  • Use case: Traditional lumped hydrological modeling

  • Output: Single watershed polygon + optional delineated routing network

  • Special: Supports lumped-to-distributed routing workflow

  • With subset_from_geofabric=True: Dissolves geofabric basins to single polygon

semidistributed:
  • Full TauDEM-based watershed delineation from DEM

  • Use case: Detailed distributed modeling with subcatchments

  • Output: River network + subcatchment polygons

  • Optional: Coastal watershed handling

  • With subset_from_geofabric=True: Extracts from existing geofabric

distributed:
  • Regular grid domain with D8 flow direction

  • Use case: Grid-based land surface models (VIC, MESH, CLM)

  • Output: Grid cells as both HRUs and routing segments

  • grid_source=’generate’: Create grid from bounding box

  • grid_source=’native’: Match forcing data resolution

Discretization Strategies:
lumped:
  • Single HRU representing entire basin

  • No spatial disaggregation

elevation:
  • Elevation bands (e.g., 100m intervals)

  • Use case: Snow modeling, orographic effects

landclass:
  • Land cover types (forest, urban, agriculture, etc.)

  • Use case: Land surface heterogeneity

soilclass:
  • Soil classification types

  • Use case: Infiltration and runoff variability

aspect:
  • Slope aspect classes (N, NE, E, SE, S, SW, W, NW)

  • Use case: Solar radiation and snow redistribution

radiation:
  • Potential solar radiation classes

  • Use case: Energy balance modeling

combined:
  • Multiple attributes combined (e.g., elevation × landclass)

  • Use case: Capturing complex spatial heterogeneity

  • Handles attribute interactions and MultiPolygons

Workflow Sequence:
  1. define_domain(): Create/extract watershed boundaries → Produces DelineationArtifacts (river basins, network, pour point)

  2. discretize_domain(): Subdivide into HRUs → Produces DiscretizationArtifacts (HRU shapefile, attributes)

  3. Visualization (optional): Spatial QA/QC plots → Generated via reporting_manager if available

Artifact Tracking:
DelineationArtifacts:
  • method: Domain definition method used

  • river_basins_path: Path to basin shapefile

  • river_network_path: Path to river network shapefile

  • pour_point_path: Path to pour point shapefile

  • metadata: Additional delineation metadata

DiscretizationArtifacts:
  • method: Discretization method used

  • hru_shapefile_path: Path to HRU shapefile

  • attributes: HRU attributes DataFrame

  • statistics: Discretization statistics (HRU count, min/max areas)

Configuration Dependencies:
Domain Definition:
  • DOMAIN_DEFINITION_METHOD: point/lumped/semidistributed/distributed

  • DOMAIN_NAME: Basin identifier

  • SUBSET_FROM_GEOFABRIC: Extract from existing geofabric (default: False)

  • GRID_SOURCE (distributed): ‘generate’ or ‘native’

  • NATIVE_GRID_DATASET (distributed + native): Dataset identifier (default: ‘era5’)

  • POUR_POINT_SHP_PATH (lumped/semidistributed): Pour point location

  • RIVER_NETWORK_SHP_PATH (subset): Existing river network

  • DOMAIN_BOUNDING_BOX: Bbox coordinates

  • GRID_CELL_SIZE (distributed): Grid spacing in meters

Discretization:
  • SUB_GRID_DISCRETIZATION: Discretization method

  • DEM_PATH: Elevation data (for elevation/aspect/radiation)

  • LAND_CLASS_PATH: Land cover data (for landclass)

  • SOIL_CLASS_PATH: Soil data (for soilclass)

  • ELEVATION_BAND_SIZE: Band interval in meters (default: 100)

Delineation (TauDEM):
  • DELINEATE_COASTAL_WATERSHEDS: Coastal handling (True/False)

  • ROUTING_DELINEATION: Routing network strategy

Output Files:

Shapefiles created in project_dir/shapefiles/.

Delineation outputs: river_basins, river_network, pour_point shapefiles.

Discretization outputs: catchment HRU shapefiles.

Example structure:

shapefiles/
├── river_basins/
│   └── bow_river_riverBasins_lumped.shp
├── river_network/
│   └── bow_river_riverNetwork_lumped.shp
├── pour_point/
│   └── bow_river_pourPoint.shp
└── catchment/
    └── bow_river_HRUs_elevation.shp
Special Workflows:
Lumped-to-Distributed Routing:
  1. Define lumped domain (single watershed polygon)

  2. Internally delineate subcatchments within lumped domain

  3. Create area-weighted remapping (lumped HRU to distributed routing)

  4. Enables distributed routing with lumped hydrology

Coastal Watershed Delineation:
  • Special handling for basins draining to ocean

  • Avoids river network artifacts at coastline

  • Uses modified TauDEM workflow

Grid-Based Distributed:
  • Creates regular grid cells

  • Assigns D8 flow direction from DEM

  • Detects and fixes routing cycles

  • Each cell is both HRU and routing segment

Visualization Integration:

If reporting_manager available: - Delineation: Watershed boundary maps, river network plots - Discretization: HRU spatial distribution, attribute histograms - QA/QC: Identifies potential issues (small HRUs, disconnected polygons)

Error Handling:
  • Validates configuration before execution

  • Raises descriptive errors for missing required shapefiles

  • Logs warnings for non-critical issues

  • Provides context for TauDEM failures

Example Workflow:
>>> from symfluence.geospatial.domain_manager import DomainManager
>>> config = SymfluenceConfig.from_file('config.yaml')
>>> logger = setup_logger()
>>> reporting = ReportingManager(config, logger)
>>>
>>> # Initialize manager
>>> domain_mgr = DomainManager(config, logger, reporting)
>>>
>>> # Define watershed boundaries
>>> domain_mgr.define_domain()
>>> print(domain_mgr.delineation_artifacts.river_basins_path)
# ./shapefiles/river_basins/bow_river_riverBasins_lumped.shp
>>>
>>> # Discretize into elevation bands
>>> domain_mgr.discretize_domain()
>>> print(domain_mgr.discretization_artifacts.statistics)
# {'hru_count': 8, 'min_area_km2': 120.5, 'max_area_km2': 450.2}
Performance Considerations:
  • Delineation: ~1-30 minutes (depends on DEM resolution, TauDEM)

  • Discretization: ~10 seconds - 5 minutes (depends on attribute resolution)

  • Grid generation: ~1-10 minutes (depends on grid cell count)

  • Memory: Peak during raster operations (~2-8 GB for high-res DEMs)

Notes

  • DomainDelineator initialized eagerly

  • DomainDiscretizationRunner initialized lazily when needed

  • Artifacts tracked for downstream workflows (preprocessing, modeling)

  • Reporting integration provides visual validation

  • Supports both simple (lumped) and complex (combined attributes) setups

See also

  • geospatial.delineation.DomainDelineator: Watershed delineation

  • geospatial.discretization.DomainDiscretizationRunner: HRU creation

  • geospatial.discretization.core.DomainDiscretizer: Discretization engine

  • geospatial.geofabric: Geofabric delineation backends

Parameters:
__init__(config, logger, reporting_manager=None)[source]#

Initialize the Domain Manager.

Parameters:
  • config (SymfluenceConfig) – SymfluenceConfig instance

  • logger (Logger) – Logger instance

  • reporting_manager (Any | None) – ReportingManager instance

Raises:

TypeError – If config is not a SymfluenceConfig instance

create_point_domain_shapefile()[source]#

Create a square basin shapefile from bounding box coordinates for point modelling.

This method creates a rectangular polygon from the BOUNDING_BOX_COORDS and saves it as a shapefile for point-based modelling approaches.

Returns:

Path to the created shapefile or None if failed

Return type:

Path | None

define_domain()[source]#

Define the domain using the configured method.

Returns:

Tuple of the domain result and delineation artifacts

Return type:

Tuple[Path | Tuple[Path, Path] | None, DelineationArtifacts]

discretize_domain()[source]#

Discretize the domain into HRUs or GRUs.

Returns:

Tuple of HRU shapefile(s) and discretization artifacts

Return type:

Tuple[Path | dict | None, DiscretizationArtifacts]

visualize_domain()[source]#

Create visualization of the domain.

Returns:

Path to the created plot or None if failed

Return type:

Path | None

visualize_discretized_domain()[source]#

Create visualization of the discretized domain.

Returns:

Path to the created plot or None if failed

Return type:

Path | None

get_domain_info()[source]#

Get information about the current domain configuration.

Returns:

Dictionary containing domain information

Return type:

Dict[str, Any]

validate_domain_configuration()[source]#

Validate domain configuration settings.

Deprecated since version Use: validate_readiness() instead. Config-level checks (required keys, definition method, bounding box format) are now handled by Pydantic validators at config construction time.

Returns:

True if configuration is valid, False otherwise

Return type:

bool

validate_readiness()[source]#

Validate that this manager is ready for execution.

Checks runtime prerequisites that Pydantic cannot verify at config construction time: subset geofabric config and grid source for distributed method.

Returns:

Dict mapping check names to pass/fail booleans.

Return type:

Dict[str, bool]

Key Methods:

from symfluence.geospatial.domain_manager import DomainManager

dm = DomainManager(config, logger)

# Define domain boundaries
dm.define_domain()

# Discretize into HRUs/GRUs
dm.discretize_domain()

# Get domain statistics
stats = dm.get_domain_statistics()

Data Manager#

Coordinates data acquisition and preprocessing.

Data Manager

Facade that coordinates acquisition, observation processing, and model-agnostic preprocessing. Keeps orchestration thin while services handle the heavy lifting. See docs under docs/source/configuration and docs/source/data for full workflows.

class symfluence.data.data_manager.DataManager(config, logger, reporting_manager=None)[source]#

Bases: BaseManager

Facade that orchestrates acquisition, preprocessing, and observation handling.

Delegates to acquisition/preprocessing services and registries; keeps runtime imports slim. Detailed behaviour lives in the docs.

Parameters:
acquire_attributes()[source]#

Acquire geospatial attributes (DEM, soil classes, land cover) for the domain.

Downloads and processes required geospatial data layers including elevation, soil classification, and land cover data from configured data sources.

acquire_forcings()[source]#

Acquire meteorological forcing data for the simulation period.

Downloads forcing variables (precipitation, temperature, radiation, etc.) from the configured forcing dataset (ERA5, RDRS, CARRA, etc.) for the specified temporal domain.

acquire_observations()[source]#

Acquire observational data for model calibration and validation.

Downloads streamflow observations, snow measurements, and other validation data from configured observation sources (USGS, WSC, SNOTEL, etc.).

acquire_em_earth_forcings()[source]#

Acquire EM-Earth supplementary forcing data.

Downloads and processes EM-Earth reanalysis data for gap-filling or supplementing primary forcing datasets.

process_observed_data()[source]#

Process observed data including streamflow and additional variables.

Raises:

DataAcquisitionError – If data processing fails

run_model_agnostic_preprocessing()[source]#

Run model-agnostic preprocessing including basin averaging and resampling.

Raises:

DataAcquisitionError – If preprocessing fails

build_model_ready_store()[source]#

Build or refresh the model-ready data store.

Creates CF-1.8 compliant NetCDF files for forcings, observations, and attributes in data/model_ready/.

validate_data_directories()[source]#

Validate that required data directories exist.

Deprecated since version Use: validate_readiness() instead.

Return type:

bool

validate_readiness()[source]#

Validate that this manager is ready for execution.

Checks whether required data directories exist.

Returns:

Dict mapping check names to pass/fail booleans.

Return type:

Dict[str, bool]

get_data_status()[source]#

Get status of data acquisition and preprocessing.

Return type:

Dict[str, Any]

Key Methods:

from symfluence.data.data_manager import DataManager

data_mgr = DataManager(config, logger)

# Acquire geospatial attributes
data_mgr.acquire_attributes()

# Process observed streamflow data
data_mgr.process_observed_data()

# Acquire forcing data (ERA5, RDRS, etc.)
data_mgr.acquire_forcings()

# Run model-agnostic preprocessing
data_mgr.run_model_agnostic_preprocessing()

Model Manager#

Coordinates model preprocessing, execution, and postprocessing.

Model Manager

Lightweight facade that resolves model workflows, runs preprocess/execute/ postprocess/visualize steps, and delegates component lookups to ModelRegistry. Detailed behaviour now lives in the docs (see docs/source/architecture and docs/source/models/*).

class symfluence.models.model_manager.ModelManager(config, logger, reporting_manager=None)[source]#

Bases: BaseManager

Facade that turns a hydrological model list into an ordered workflow.

Resolves routing dependencies, then runs preprocess → execute → postprocess → visualize phases using components pulled from ModelRegistry. Full behaviour is covered in the docs; keeping this concise speeds imports.

Parameters:
preprocess_models(params=None)[source]#

Preprocess forcing data into model-specific input formats.

Transforms generic forcing data (from data acquisition) into model-specific input formats. Invokes registered preprocessor for each model in resolved workflow. Preprocessors handle all model-specific input requirements.

Preprocessing Workflow:
  1. Resolve model workflow (includes implicit dependencies)

  2. For each model in workflow: a. Create model input directory (project_dir/forcing/{MODEL}_input/) b. Retrieve preprocessor class from ModelRegistry c. Instantiate preprocessor with config, logger, and params d. Run preprocessor.run_preprocessing()

  3. Preprocessor outputs go to model-specific input directories

Model-Specific Preprocessing Examples:

SUMMA: - ERA5 NetCDF → SUMMA forcing file format - Time step interpolation/aggregation - Unit conversion (SI → SUMMA units) - Spatial interpolation to model grid

FUSE: - Catchment-averaged forcing extraction - Temporal aggregation to model timestep - Unit conversion

GR (Rainfall-Runoff): - Daily precipitation, temperature aggregation - Missing value handling

mizuRoute: - Basin delineation and network structure - Unit hydrograph parameters - Routing network initialization

Parameter Usage:

params dict passed to preprocessor for calibration scenarios: - Preprocessor may use params to adjust input processing - Example: Parameter-dependent unit conversion or scaling - If preprocessor doesn’t accept params, they’re ignored

Parameters:

params (Dict[str, Any] | None) – Optional Dict[str, Any] with parameter values - Example: {‘SAI_SV’: 0.3, ‘snowCriticalTemp’: -1.5} - Used for calibration (different param values → different inputs) - If None, uses default parameter values from config - Preprocessor determines if params are needed (introspection)

Raises:

Exception – If preprocessing fails for any model (logged and re-raised) - Caught internally with full traceback logged - Enables debugging of preprocessing issues

Side Effects:
  • Creates project_dir/forcing/{MODEL}_input/ directories

  • Generates model-specific input files

  • Logs preprocessing progress and errors to logger

Examples

>>> # Standard preprocessing with default parameters
>>> manager.preprocess_models()
>>> # Preprocessing with parameter variations (calibration)
>>> params = {'param1': 0.5, 'param2': 100.0}
>>> manager.preprocess_models(params=params)

Notes

  • LSTM and similar data-driven models skip preprocessing

  • Registry lookup enables new models without modifying this method

  • Parameter introspection (inspect.signature) handles optional params

  • Errors in preprocessing halt workflow and raise exception

See also

ModelRegistry.get_preprocessor(): Retrieve preprocessor class run_models(): Execute preprocessed models postprocess_results(): Extract and standardize results

run_models()[source]#

Execute models in resolved workflow order.

Prefers dCoupler graph-based execution when available for multi-model workflows (provides conservation checking, spatial remapping, and unit conversion). Falls back to sequential registry-based execution when dCoupler is not installed or graph execution fails.

See also

ModelRegistry.get_runner(): Retrieve runner class ModelRegistry.get_runner_method(): Get method name preprocess_models(): Prepare inputs postprocess_results(): Extract and standardize outputs

postprocess_results()[source]#

Post-process model results using the registry.

Extracts streamflow and other relevant outputs from model-specific result files and converts them to a standardized format for evaluation and comparison. After postprocessing, calculates and logs baseline performance metrics.

The standardized interface expects postprocessors to implement extract_streamflow() method, which saves results to: project_dir/results/{experiment_id}_results.csv

Note

Automatically triggers visualization of timeseries results after extraction. Falls back to legacy extract_results() method for backward compatibility.

log_baseline_performance()[source]#

Log baseline model performance metrics before calibration.

Calculates and logs performance metrics comparing simulated vs observed streamflow after initial model run. Provides diagnostic snapshot of model performance before calibration, enabling users to: 1. Assess initial model setup quality 2. Detect configuration issues 3. Establish baseline for improvement assessment 4. Identify models needing attention before calibration

Metrics Calculated:

KGE (Kling-Gupta Efficiency): Formula: KGE = 1 - sqrt((r-1)² + (α-1)² + (β-1)²) where r is correlation coefficient, α is ratio of simulated to observed std dev, β is ratio of simulated to observed mean. Range: [-∞, 1]. KGE >= 0.7 indicates reasonable performance, 0.5 <= KGE < 0.7 requires calibration, KGE < 0.5 needs significant improvements, KGE < 0 is worse than using observed mean.

KGE’ (Modified KGE): Symmetric variant for metric comparison. Useful when comparing multiple model configurations.

NSE (Nash-Sutcliffe Efficiency): Formula: NSE = 1 - (Σ(Qobs-Qsim)² / Σ(Qobs-Qmean)²) Correlation-based metric with range [-∞, 1], similar interpretation as KGE. Less sensitive to bias and variability than KGE.

Bias (%): Formula: Bias = ((Mean_Sim - Mean_Obs) / Mean_Obs) × 100 Positive values indicate model overestimates, negative indicates underestimates. Can indicate systematic model errors.

Data Sources:
  1. Simulation Results: results_dir/{experiment_id}_results.csv - Generated by postprocess_results() - Contains model discharge columns

  2. Observations: Multiple fallback strategies a. Results file column (if ‘obs’ or ‘observed’ in column name) b. Observations directory: project_dir/observations/streamflow/preprocessed/ c. External observation file with datetime and discharge columns

Workflow:
  1. Load simulation results CSV (index=datetime)

  2. Find observation column or load from observations directory

  3. For each simulation column (e.g., ‘SUMMA_discharge_cms’): a. Align observations and simulation by datetime index b. Remove NaN pairs c. Calculate metrics (KGE, KGE’, NSE, Bias) d. Log results with interpretation

  4. Log footer with metrics summary

Output Format:

========================================================
BASELINE MODEL PERFORMANCE (before calibration)
========================================================
  MODELNAME:
    KGE  = 0.7234
    KGE' = 0.7156
    NSE  = 0.6987
    Bias = +5.3%
    Valid data points: 1825
  Note: KGE >= 0.7 indicates reasonable baseline performance
========================================================
Error Handling:
  • Results file not found: Skipped with debug message

  • No observations found: Skipped with debug message

  • Insufficient valid data (<10 points): Logged as warning

  • Metric calculation errors: Caught and logged as debug

Side Effects:
  • Logs baseline metrics to logger.info()

  • Logs interpretation and recommendations

  • No files created or modified

Examples

>>> # Called automatically by postprocess_results()
>>> manager.postprocess_results()  # Includes baseline logging
>>> # Or called directly
>>> manager.log_baseline_performance()

Notes

  • Called automatically by postprocess_results()

  • Requires results file from postprocessing

  • Useful for QA/QC before calibration begins

  • Graceful degradation if data not available

  • KGE interpretation helps understand model biases

See also

postprocess_results(): Automatically calls log_baseline_performance() kge(), kge_prime(), nse(): Metric calculation functions evaluation.metrics: Metric library

visualize_outputs()[source]#

Visualize model outputs using registered model visualizers.

Invokes visualization functions for each primary model in the configuration. Visualizers are registered per-model and handle model-specific output formats.

Note

Requires reporting_manager to be configured. Skips visualization if not available. Each model can register its own visualization function with the ModelRegistry.

Key Methods:

from symfluence.models.model_manager import ModelManager

mm = ModelManager(config, logger)

# Preprocess for all configured models
mm.preprocess_models()

# Run model simulations
mm.run_models()

# Extract and format results
mm.postprocess_results()

# Get available models
models = mm.get_available_models()

Optimization Manager#

Handles calibration and optimization.

Optimization Manager

Coordinates calibration runs by selecting algorithms and model-specific optimizers. Keeps orchestration lean; algorithm details and examples are in docs/source/calibration and optimizer pages.

class symfluence.optimization.optimization_manager.OptimizationManager(config, logger, reporting_manager=None)[source]#

Bases: BaseManager

Facade over model-specific optimizers for iterative calibration.

Chooses algorithm, fetches optimizer from the registry, and runs it; heavy lifting stays in the optimizer classes. See docs for workflow details.

Parameters:
property optimizers: Any#

expose registered optimizers/algorithms.

Type:

Backward compatibility

run_optimization_workflow()[source]#

Run main optimization workflow based on configuration.

Entry point for complete optimization process. Checks configuration to determine which optimization methods to execute and runs them in sequence. Currently supports ‘iteration’ (calibration) and handles deprecated method warnings.

Workflow:
  1. Check config.optimization.methods (list of methods)

  2. For each enabled method: - ‘iteration’: Run calibrate_model() for iterative optimization - Deprecated methods: Log warnings

  3. Return results from all executed methods

Configuration:

optimization.methods: List of optimization methods - Example: [‘iteration’] enables calibration - Example: [‘iteration’, ‘emulation’] enables both (emulation deprecated)

Supported Methods:
  • ‘iteration’: Iterative parameter optimization (calibration)

Deprecated Methods (logged as warnings):
  • ‘differentiable_parameter_emulation’: Use gradient-based (ADAM/LBFGS) instead

  • ‘emulation’: Use model emulation libraries instead

Returns:

Results from completed workflows - Keys: Method names (e.g., ‘calibration’) - Values: Path to results file as string, or None if failed - Example: {‘calibration’: ‘/path/to/results.csv’}

Return type:

Dict[str, Any]

Side Effects:
  • Logs method execution and warnings to logger

  • Calls calibrate_model() if ‘iteration’ enabled

  • May create results files and directories

Examples

>>> # Standard workflow
>>> opt_mgr = OptimizationManager(config, logger)
>>> results = opt_mgr.run_optimization_workflow()
>>> if 'calibration' in results:
...     print(f"Calibration results: {results['calibration']}")
>>> # With deprecated method (warning logged)
>>> # config.optimization.methods = ['iteration', 'emulation']
>>> results = opt_mgr.run_optimization_workflow()
>>> # Logs warning about 'emulation' being deprecated

Notes

  • Only ‘iteration’ currently implemented

  • Deprecated methods logged but not executed

  • Non-empty results dict indicates at least one method ran

  • Empty dict means no methods were enabled or all failed

See also

calibrate_model(): Run iterative optimization get_optimization_status(): Check optimization configuration

calibrate_model()[source]#

Calibrate model(s) using configured optimization algorithm.

Coordinates iterative parameter optimization for one or more hydrological models using the registry-based unified optimizer infrastructure. Handles configuration validation, optimizer instantiation, algorithm selection, and execution.

Calibration Workflow:
  1. Check if ‘iteration’ in config.optimization.methods - If not, log info and return None (disabled)

  2. Get algorithm from config.optimization.algorithm (default: ‘PSO’) - Supported: DDS, ASYNC_DDS, PSO, DE, SCE-UA, NSGA-II, ADAM, LBFGS

  3. Parse configured hydrological models (config.model.hydrological_model) - Comma-separated list, e.g., ‘SUMMA,FUSE’ - Upper-case normalization

  4. For each model: a. Call _calibrate_with_registry(model, algorithm) b. Collect results c. Log completion

  5. Return last result (for single model) or last of multiple

Algorithm Selection:

Via config.optimization.algorithm: - DDS, ASYNC-DDS, ASYNCDDS, ASYNC_DDS: Dynamically Dimensioned Search - PSO: Particle Swarm Optimization - SCE-UA: Shuffled Complex Evolution - DE: Differential Evolution - NSGA-II: Multi-objective non-dominated sorting - ADAM: Gradient-based with adaptive moments - LBFGS: Gradient-based quasi-Newton method

Registry-Based Model Optimization:

Each model uses model-specific optimizer from OptimizerRegistry: - OptimizerRegistry.get_optimizer(‘MODELNAME’) returns optimizer class - Optimizer class inherits from BaseModelOptimizer - Example: SUMMAOptimizer for SUMMA calibration

Configuration Parameters:
Workflow Control:

optimization.methods: Must contain ‘iteration’ optimization.algorithm: Algorithm name (PSO, DDS, ADAM, etc.)

Model Selection:

model.hydrological_model: Comma-separated model names

Algorithm-Specific (if applicable):

optimization.adam_steps: Number of steps (default: 100) optimization.adam_learning_rate: Learning rate (default: 0.01) optimization.lbfgs_steps: Max steps (default: 50) optimization.lbfgs_learning_rate: Step size (default: 0.1)

Returns:

Path to last completed calibration results file - None if: disabled, no models configured, or all failed - Path if: at least one model calibration completed successfully - Typically: project_dir/optimization/{model}_{algorithm}_results.csv

Return type:

Optional[Path]

Raises:
  • (Caught internally, returns None instead)

  • - Registry lookup failures

  • - Optimizer instantiation errors

  • - Algorithm execution failures

Side Effects:
  • Creates project_dir/optimization/ directory

  • Generates model-specific results files

  • Logs calibration progress and status to logger

  • Modifies reporting_manager state (if configured)

Examples

>>> # Single model with DDS
>>> opt_mgr = OptimizationManager(config, logger)
>>> results_path = opt_mgr.calibrate_model()
>>> if results_path:
...     print(f"Calibration completed: {results_path}")
>>> # Multiple models (SUMMA + FUSE)
>>> # config.model.hydrological_model = 'SUMMA,FUSE'
>>> results_path = opt_mgr.calibrate_model()  # Returns FUSE results
>>> # Disabled calibration
>>> # config.optimization.methods = ['forward']  (no 'iteration')
>>> results_path = opt_mgr.calibrate_model()
>>> assert results_path is None

Notes

  • Disabled silently returns None (no error)

  • Registry lookup errors logged and skipped

  • Execution errors caught and logged (non-fatal)

  • Multiple models: Last result returned (not aggregated)

See also

_calibrate_with_registry(): Registry-based optimizer execution run_optimization_workflow(): Top-level workflow coordinator OptimizerRegistry: Registry for model-specific optimizers BaseModelOptimizer: Base class for model optimizers

get_optimization_status()[source]#

Get status of optimization operations.

Returns:

Dictionary containing optimization status information

Return type:

Dict[str, Any]

validate_optimization_configuration()[source]#

Validate optimization configuration settings.

Deprecated since version Use: validate_readiness() instead. Algorithm and metric validation is now handled by Pydantic validators at config construction time.

Returns:

Dictionary containing validation results

Return type:

Dict[str, bool]

validate_readiness()[source]#

Validate that this manager is ready for execution.

Checks runtime prerequisites that Pydantic cannot verify: whether the configured model has a registered optimizer and whether calibration parameters are defined.

Returns:

Dict mapping check names to pass/fail booleans.

Return type:

Dict[str, bool]

get_available_optimizers()[source]#

Get list of available optimization algorithms.

Returns:

Dictionary mapping algorithm identifiers to their descriptions

Return type:

Dict[str, str]

load_optimization_results(filename=None)[source]#

Load optimization results from file.

Parameters:

filename (str, optional) – Name of results file to load. If None, uses the default filename based on experiment_id.

Returns:

Dictionary with optimization results. Returns None if loading fails.

Return type:

Optional[Dict]

Key Methods:

from symfluence.optimization.optimization_manager import OptimizationManager

opt = OptimizationManager(config, logger)

# Run full optimization workflow
results = opt.run_optimization_workflow()

# Or run calibration directly
results_path = opt.calibrate_model()

# Check optimization status
status = opt.get_optimization_status()

# Validate configuration
validation = opt.validate_optimization_configuration()

# Get available optimizers
optimizers = opt.get_available_optimizers()

Analysis Manager#

Performs model evaluation and analysis.

Analysis management for SYMFLUENCE model evaluation workflows.

Coordinates benchmarking, sensitivity analysis, and decision analysis for evaluating hydrological model performance and parameter importance.

class symfluence.evaluation.analysis_manager.AnalysisManager(config, logger, reporting_manager=None)[source]#

Bases: ConfigurableMixin

Orchestrates comprehensive post-calibration analysis of model performance and sensitivity.

Central coordinator for evaluating hydrological model performance through benchmarking, sensitivity analysis, and decision analysis. Provides unified interface to investigate model behavior, parameter importance, and structural choices. Integrates with evaluation framework to generate publication-ready analysis reports and visualizations.

This class implements the Facade Pattern to manage complex analysis workflows across multiple hydrological models (SUMMA, FUSE, GR, HYPE, etc.). Enables systematic investigation of model strengths/weaknesses and parameter contributions to output uncertainty.

Key Responsibilities:

  1. Benchmarking (run_benchmarking): Compare calibrated model against simple reference models (mean flow, seasonality model, persistence model). Purpose: Quantify value added by sophisticated model vs simplicity.

  2. Sensitivity Analysis (run_sensitivity_analysis): Evaluate parameter importance using Morris screening, Sobol indices, or FAST methods. Purpose: Prioritize parameters for observation/data requirements.

  3. Decision Analysis (run_decision_analysis): Assess impact of model structure choices including alternative process representations, parameter sets, and calibration targets. Purpose: Evaluate trade-offs in model complexity vs parsimony.

  4. Visualization: Generate analysis plots via ReportingManager including performance comparisons, sensitivity indices, parameter rankings, and decision analysis trade-off plots.

Analysis Types:

Benchmarking:
Input:
  • Calibrated model results

  • Observed streamflow

  • Reference model outputs

Process:
  1. Run reference models (mean, seasonality, persistence)

  2. Compute performance metrics (KGE, NSE, RMSE)

  3. Compare against calibrated model

  4. Calculate relative improvement

Output:
  • Benchmark comparison table

  • Performance metrics for all models

  • Visualization showing performance rank

Interpretation:

KGE(model) > KGE(mean) ≈ Model outperforms naive reference KGE(model) < KGE(mean) ≈ Model worse than simple average (concerning!) KGE(model) >> KGE(seasonality) ≈ Model captures dynamics beyond seasonal pattern

Sensitivity Analysis:
Input:
  • Parameter ranges (bounds)

  • Model configuration

  • Model outputs and observations

Sampling Methods:
  • Morris One-At-a-Time: Fast screening (~100s samples)

  • Sobol Quasi-Random: Variance-based (~1000s samples)

  • FAST: Spectral approach (~500s samples)

Output:
  • Sensitivity indices (μ, σ, μ*, S1, ST, etc.)

  • Parameter ranking by importance

  • Grouped influence vs non-influential parameters

Interpretation:

mu* (modified Morris) is average absolute sensitivity - main effect magnitude. S1 (Sobol 1st order) is fraction of output variance from Xi alone. ST (Sobol total) is fraction of output variance involving Xi. Non-influential parameters can be removed from calibration.

Decision Analysis:
Input:
  • Multiple model configurations

  • Results from each configuration

  • Observations for validation

Comparison Axes:
  • Model structure (e.g., SUMMA vs FUSE)

  • Process representation (e.g., 2-layer vs 3-layer soil)

  • Calibration target (KGE vs NSE vs RMSE)

  • Spatial discretization (lumped vs distributed)

Output:
  • Performance metrics for each configuration

  • Trade-off analysis plots

  • Pareto frontier of non-dominated solutions

Use Case:

Decide between 3-layer soil (more parameters, better fit) vs 2-layer (simpler, more generalizable) by comparing performance

Configuration Parameters:

analysis.benchmarking.enabled: bool (default False)

Enable benchmarking analysis

analysis.benchmarking.reference_models: list

Which reference models to include: [‘mean’, ‘seasonality’, ‘persistence’]

analysis.sensitivity.method: str

Sensitivity method: ‘morris’, ‘sobol’, ‘fast’, ‘delsa’

analysis.sensitivity.num_samples: int

Number of samples for sensitivity analysis

analysis.sensitivity.parameters: list

Which parameters to analyze (subset of all model parameters)

analysis.decision_analysis.configurations: list

List of model configurations to compare

reporting.analysis_enabled: bool

Generate visualization plots of analysis results

Output Structure:

analysis_results/
benchmarking/

benchmark_comparison.csv # Performance metrics table benchmark_plots/ # PNG plots

sensitivity_analysis/

sensitivity_indices.csv # Sobol indices, Morris screening parameter_ranking.csv # Ranked by importance sensitivity_plots/ # Bar charts, rankings

decision_analysis/

configuration_comparison.csv # Metrics for each configuration tradeoff_analysis/ # Trade-off plots, Pareto frontier

Example Usage:

>>> config = SymfluenceConfig.from_file('config.yaml')
>>> logger = setup_logger()
>>> analysis_mgr = AnalysisManager(config, logger)
>>>
>>> # Run benchmarking
>>> benchmark_results = analysis_mgr.run_benchmarking()
>>> # Output: Performance comparison with mean, seasonality, persistence models
>>>
>>> # Run sensitivity analysis
>>> sensitivity_results = analysis_mgr.run_sensitivity_analysis()
>>> # Output: Parameter importance rankings
>>>
>>> # Run decision analysis
>>> decision_results = analysis_mgr.run_decision_analysis()
>>> # Output: Trade-off analysis comparing configurations

Key Methods:

run_benchmarking() → Path:

Execute benchmarking against reference models Returns path to benchmark results directory

run_sensitivity_analysis() → Path:

Execute parameter sensitivity analysis Returns path to sensitivity results

run_decision_analysis() → Path:

Execute model structure decision analysis Returns path to decision analysis results

run_all_analyses() → Dict[str, Path]:

Execute all configured analyses Returns dict of {analysis_type: results_path}

Performance:

Benchmarking: Minutes to hours (depends on model runtime) Sensitivity Analysis: Hours to days (1000+ model evaluations needed) Decision Analysis: Days+ (multiple full model runs)

Integration Points:

  • EvaluationRegistry: Access evaluators for metrics

  • OptimizationManager: Access calibrated parameter sets

  • ModelManager: Run reference models, configurations

  • ReportingManager: Generate analysis visualizations

  • DataManager: Access observations for validation

See also

  • Benchmarker: Low-level benchmarking implementation

  • SensitivityAnalyzer: Low-level sensitivity analysis

  • EvaluationRegistry: Registry of evaluation methods

  • ReportingManager: Visualization of analysis results

Parameters:
config#

Configuration dictionary

Type:

Dict[str, Any]

logger#

Logger instance

Type:

logging.Logger

__init__(config, logger, reporting_manager=None)[source]#

Initialize the Analysis Manager.

Parameters:
  • config (SymfluenceConfig) – SymfluenceConfig instance

  • logger (Logger) – Logger instance

  • reporting_manager (Any | None) – ReportingManager instance

Raises:

TypeError – If config is not a SymfluenceConfig instance

run_benchmarking()[source]#

Run benchmarking analysis to evaluate model performance against reference models.

Benchmarking compares the performance of sophisticated hydrological models against simple reference models (e.g., mean flow, seasonality model) to quantify the value added by the model’s complexity. This process includes:

  1. Preprocessing observed data for the benchmark period

  2. Running simple benchmark models (e.g., mean, seasonality, persistence)

  3. Computing performance metrics for each benchmark

  4. Visualizing benchmark results for comparison

Benchmarking provides a baseline for evaluating model performance and helps identify the minimum acceptable performance for a given watershed.

Returns:

Path to benchmark results file or None if benchmarking failed

Return type:

Optional[Path]

Raises:
  • FileNotFoundError – If required observation data is missing

  • ValueError – If date ranges are invalid

  • Exception – For other errors during benchmarking

run_sensitivity_analysis()[source]#

Run sensitivity analysis to evaluate parameter importance and uncertainty.

Sensitivity analysis quantifies how model parameters influence simulation results and performance metrics. This analysis helps:

  1. Identify which parameters have the most significant impact on model performance

  2. Quantify parameter uncertainty and its effect on predictions

  3. Guide model simplification by identifying insensitive parameters

  4. Inform calibration strategies by focusing on sensitive parameters

The method iterates through configured hydrological models, running model-specific sensitivity analyses where supported. Uses AnalysisRegistry for model-specific analyzers when available, falling back to generic SensitivityAnalyzer for models without custom implementations.

Returns:

Dictionary mapping model names to sensitivity results,

or None if the analysis was disabled or failed

Return type:

Optional[Dict]

Raises:
  • FileNotFoundError – If required optimization results are missing

  • Exception – For other errors during sensitivity analysis

run_koopman_analysis(ensemble_df, obs_streamflow, train_end='2007-12-31', eval_start='2008-01-01', rank=None, hankel_d=1, svd_threshold=0.99, dmd_method='fbdmd', analyzer_name='DEFAULT')[source]#

Run Koopman operator analysis of a multi-model hydrological ensemble.

Uses EDMD with structurally diverse model outputs as lifting functions to approximate the Koopman operator. The dictionary contains only model outputs (no observed streamflow), ensuring a fair comparison with the ensemble mean. Observed streamflow is predicted via Ridge regression from the Koopman eigenspace.

Parameters:
  • ensemble_df (pandas.DataFrame) – (T, N) DataFrame of model outputs (m^3/s), aligned daily

  • obs_streamflow (pandas.Series) – (T,) Series of observed streamflow (m^3/s)

  • train_end (str) – Last training date (default “2007-12-31”)

  • eval_start (str) – First evaluation date (default “2008-01-01”)

  • rank (int | None) – Explicit DMD rank (None = auto from SVD threshold)

  • hankel_d (int) – Hankel delay depth for model outputs (1 = no embedding)

  • svd_threshold (float) – Cumulative energy threshold for rank selection

  • dmd_method (str) – ‘standard’ or ‘fbdmd’ (default ‘fbdmd’)

  • analyzer_name (str) – Registry key for the Koopman analyzer (default “DEFAULT”)

Returns:

Dict with eigenvalues, timescales, metrics, mode_loadings, etc., or None if analysis failed.

Return type:

Dict | None

run_decision_analysis()[source]#

Run decision analysis to assess the impact of model structure choices.

Decision analysis evaluates different model structure configurations (e.g., process representations, parameterizations) to understand their impact on model performance.

Uses AnalysisRegistry to discover model-specific decision analyzers. Each model can register its own analyzer that implements the run_full_analysis() interface returning (results_file, best_combinations).

Returns:

Dictionary mapping model names to decision analysis results,

or None if the analysis was disabled or failed

Return type:

Optional[Dict]

get_analysis_status()[source]#

Get status of various analyses.

This method provides a comprehensive status report on the analysis operations. It checks for the existence of key files and directories to determine which analyses have been completed successfully and which are available to run.

The status information includes: - Whether benchmarking has been completed - Whether sensitivity analysis is available and its results exist - Whether decision analysis is available and its results exist - Whether optimization results (required for some analyses) exist

This information is useful for tracking progress, diagnosing issues, and providing feedback to users.

Returns:

Dictionary containing analysis status information,

including flags for completed analyses and available results

Return type:

Dict[str, Any]

run_multivariate_evaluation(sim_results)[source]#

Run multivariate evaluation against all available observations.

Parameters:

sim_results (Dict[str, pandas.Series])

Return type:

Dict[str, Dict[str, float]]

validate_analysis_requirements()[source]#

Validate that requirements are met for running analyses.

Deprecated since version Use: validate_readiness() instead.

Returns:

Dictionary indicating which analyses can be run.

Return type:

Dict[str, bool]

validate_readiness()[source]#

Validate that this manager is ready for execution.

Checks runtime prerequisites: processed observations, optimization results (for sensitivity), and simulation outputs (for decision analysis). All analyses require processed observations as a baseline.

Returns:

Dict mapping check names to pass/fail booleans.

Return type:

Dict[str, bool]

Key Methods:

from symfluence.evaluation.analysis_manager import AnalysisManager

am = AnalysisManager(config, logger)

# Run benchmarking analysis
am.run_benchmarking()

# Run sensitivity analysis
am.run_sensitivity_analysis()

# Run decision analysis
am.run_decision_analysis()

Workflow Orchestrator#

Manages workflow step execution and dependencies.

Workflow orchestration for SYMFLUENCE hydrological modeling pipeline.

Coordinates the execution sequence of modeling steps including domain definition, data preprocessing, model execution, optimization, and analysis phases.

class symfluence.project.workflow_orchestrator.WorkflowStep(name, cli_name, func, check_func, description)[source]#

Bases: ConfigMixin

Represents a single step in the SYMFLUENCE workflow.

Parameters:
  • name (str)

  • cli_name (str)

  • func (Callable)

  • check_func (Callable)

  • description (str)

name: str#
cli_name: str#
func: Callable#
check_func: Callable#
description: str#
class symfluence.project.workflow_orchestrator.WorkflowOrchestrator(managers, config, logger, logging_manager=None, provenance=None)[source]#

Bases: ConfigMixin

Orchestrates the SYMFLUENCE workflow execution and manages the step sequence.

The WorkflowOrchestrator is responsible for defining, coordinating, and executing the complete SYMFLUENCE modeling workflow. It integrates the various manager components into a coherent sequence of operations, handling dependencies between steps, tracking progress, and providing status information.

Key responsibilities: - Defining the sequence of workflow steps and their validation checks - Coordinating execution across different manager components - Handling execution flow (skipping completed steps, stopping on errors) - Providing status information and execution reports - Validating prerequisites before workflow execution

This class represents the “conductor” of the SYMFLUENCE system, ensuring that each component performs its tasks in the correct order and with the necessary inputs from previous steps.

Parameters:
  • managers (Dict[str, Any])

  • config (SymfluenceConfig | Dict[str, Any])

  • logger (Logger)

managers#

Dictionary of manager instances

Type:

Dict[str, Any]

config#

Typed configuration object

Type:

SymfluenceConfig

logger#

Logger instance

Type:

logging.Logger

domain_name#

Name of the hydrological domain

Type:

str

experiment_id#

ID of the current experiment

Type:

str

project_dir#

Path to the project directory

Type:

Path

logging_manager#

Reference to logging manager for enhanced formatting

__init__(managers, config, logger, logging_manager=None, provenance=None)[source]#

Initialize the workflow orchestrator.

Parameters:
  • managers (Dict[str, Any]) – Dictionary of manager instances for each functional area

  • config (SymfluenceConfig | Dict[str, Any]) – SymfluenceConfig instance (dicts are auto-converted)

  • logger (Logger) – Logger instance for recording operations

  • logging_manager – Reference to LoggingManager for enhanced formatting

  • provenance – Optional RunProvenance instance for step-level tracking

Raises:

KeyError – If essential configuration values are missing

define_workflow_steps()[source]#

Define the workflow steps with their output validation checks and descriptions.

Returns:

List of WorkflowStep objects

Return type:

List[WorkflowStep]

run_workflow(force_run=False)[source]#

Run the complete workflow according to the defined steps.

This method executes each step in the workflow sequence, handling: - Conditional execution based on existing outputs - Error handling with configurable stop-on-error behavior - Progress tracking and timing information - Comprehensive logging of each operation

The workflow can be configured to: - Skip steps that have already been completed (default) - Force re-execution of all steps (force_run=True) - Continue or stop on errors (based on STOP_ON_ERROR config)

Parameters:

force_run (bool) – If True, forces execution of all steps even if outputs exist. If False (default), skips steps with existing outputs.

Raises:

Exception – If a step fails and STOP_ON_ERROR is True in configuration

Note

The method provides detailed logging throughout execution, including: - Step headers with progress indicators - Execution timing for each step - Clear success/skip/failure indicators - Final summary statistics

validate_workflow_prerequisites()[source]#

Validate that all prerequisites are met before running the workflow.

Config-level validation (required keys, types, ranges) is handled by Pydantic at SymfluenceConfig construction time. This method focuses on runtime prerequisites: manager initialization and manager readiness.

Returns:

True if all prerequisites are met, False otherwise

Return type:

bool

run_individual_steps(step_names, continue_on_error=False)[source]#

Execute a specific list of workflow steps by their CLI names.

Parameters:
  • step_names (List[str]) – List of step CLI names to execute

  • continue_on_error (bool) – Whether to continue to next step if one fails

Returns:

List of dictionaries containing execution results for each step

Return type:

List[Dict[str, Any]]

get_workflow_status()[source]#

Get the current status of the workflow execution.

This method examines each step in the workflow to determine whether it has been completed, using the same output validation checks used during execution. It provides a comprehensive view of workflow progress, including which steps are complete and which are pending.

The status information is useful for: - Monitoring long-running workflows - Generating progress reports - Diagnosing execution issues - Providing feedback to users

Returns:

Dictionary containing workflow status information, including:
  • total_steps: Total number of workflow steps

  • completed_steps: Number of completed steps

  • pending_steps: Number of pending steps

  • step_details: List of dictionaries with details for each step (name and completion status)

Return type:

Dict[str, Any]

Usage:

from symfluence.project.workflow_orchestrator import WorkflowOrchestrator

orchestrator = WorkflowOrchestrator(config, logger, managers)

# Run full workflow
orchestrator.run_workflow()

# Run specific step
orchestrator.run_step("calibrate_model")

# Get workflow status
status = orchestrator.get_workflow_status()

Model Registry#

The Model Registry enables plugin-style model integration.

Registering Models#

from symfluence.models.registry import ModelRegistry

# Register preprocessor
@ModelRegistry.register_preprocessor('MY_MODEL')
class MyPreProcessor:
    def __init__(self, config, logger):
        self.config = config
        self.logger = logger

    def run_preprocessing(self):
        # Preprocessing logic
        pass

# Register runner
@ModelRegistry.register_runner('MY_MODEL', method_name='run_my_model')
class MyRunner:
    def __init__(self, config, logger, reporting_manager=None):
        self.config = config
        self.logger = logger

    def run_my_model(self):
        # Model execution logic
        pass

# Register postprocessor
@ModelRegistry.register_postprocessor('MY_MODEL')
class MyPostProcessor:
    def __init__(self, config, logger, reporting_manager=None):
        self.config = config
        self.logger = logger

    def extract_streamflow(self):
        # Result extraction logic
        pass

Querying Registry#

from symfluence.models.registry import ModelRegistry

# List registered models
models = ModelRegistry.list_models()
# ['SUMMA', 'FUSE', 'GR', 'HYPE', 'NGEN', ...]

# Get specific components
preprocessor_cls = ModelRegistry.get_preprocessor('SUMMA')
runner_cls = ModelRegistry.get_runner('SUMMA')
postprocessor_cls = ModelRegistry.get_postprocessor('SUMMA')

# Check if model is registered
is_registered = ModelRegistry.is_registered('MY_MODEL')

Optimization API#

Base Optimizer#

Base Model Optimizer

Abstract base class providing unified optimization infrastructure for all hydrological models. Implements template method pattern to delegate model-specific operations while centralizing algorithm execution, parallel processing, results tracking, and final evaluation workflows.

Mixin Components:

ConfigurableMixin, ParallelExecutionMixin, ResultsTrackingMixin, RetryExecutionMixin, GradientOptimizationMixin

Abstract Methods (Must Implement in Subclass):

_get_model_name() -> str _run_model_for_final_evaluation(output_dir) -> bool _get_final_file_manager_path() -> Path

Optional Overrides:

_create_parameter_manager(), _create_calibration_target(), _create_worker(), _apply_best_parameters_for_final(), _get_settings_directory()

class symfluence.optimization.optimizers.base_model_optimizer.BaseModelOptimizer(config, logger, optimization_settings_dir=None, reporting_manager=None)[source]#

Bases: ConfigurableMixin, ParallelExecutionMixin, ResultsTrackingMixin, RetryExecutionMixin, GradientOptimizationMixin, ABC

Abstract base class for model-specific optimizers.

Implements template method pattern providing unified optimization across all hydrological models. Uses mixins for parallel execution, results tracking, retry logic, and gradient-based optimization.

Subclasses must implement: _get_model_name(), _run_model_for_final_evaluation(), _get_final_file_manager_path(). Components (param_manager, worker, calibration_target) are created via overridable factory methods with registry-based defaults.

Parameters:
  • config (SymfluenceConfig | Dict[str, Any])

  • logger (Logger)

  • optimization_settings_dir (Path | None)

  • reporting_manager (Any | None)

DEFAULT_ITERATIONS = 100#
DEFAULT_POPULATION_SIZE = 30#
DEFAULT_PENALTY_SCORE = -9999.0#
__init__(config, logger, optimization_settings_dir=None, reporting_manager=None)[source]#

Initialize the model optimizer.

Parameters:
  • config (SymfluenceConfig | Dict[str, Any]) – Configuration (typed SymfluenceConfig or legacy dict)

  • logger (Logger) – Logger instance

  • optimization_settings_dir (Path | None) – Optional path to optimization settings

  • reporting_manager (Any | None) – ReportingManager instance

property task_builder: TaskBuilder#

Lazy-initialized task builder.

property population_evaluator: PopulationEvaluator#

Lazy-initialized population evaluator.

property results_saver: FinalResultsSaver#

Lazy-initialized results saver.

property final_orchestrator: FinalEvaluationOrchestrator#

Lazy-initialized final evaluation orchestrator.

log_iteration_progress(algorithm_name, iteration, best_score, secondary_score=None, secondary_label=None, n_improved=None, population_size=None, crash_stats=None)[source]#

Log optimization progress. Delegates to EvaluationMetricsTracker.

Parameters:
  • algorithm_name (str)

  • iteration (int)

  • best_score (float)

  • secondary_score (float | None)

  • secondary_label (str | None)

  • n_improved (int | None)

  • population_size (int | None)

  • crash_stats (Dict[str, Any] | None)

Return type:

None

log_initial_population(algorithm_name, population_size, best_score)[source]#

Log initial population completion. Delegates to EvaluationMetricsTracker.

Parameters:
  • algorithm_name (str)

  • population_size (int)

  • best_score (float)

Return type:

None

get_crash_stats()[source]#

Return crash rate statistics. Delegates to EvaluationMetricsTracker.

Return type:

Dict[str, Any]

run_optimization(algorithm_name)[source]#

Run optimization using a specified algorithm from the registry.

Parameters:

algorithm_name (str) – Algorithm name (case-insensitive)

Returns:

Path to results JSON file

Return type:

Path

run_dds()[source]#

Run Dynamically Dimensioned Search (DDS) optimization.

Return type:

Path

run_pso()[source]#

Run Particle Swarm Optimization (PSO).

Return type:

Path

run_de()[source]#

Run Differential Evolution (DE) optimization.

Return type:

Path

run_sce()[source]#

Run Shuffled Complex Evolution (SCE-UA) optimization.

Return type:

Path

run_async_dds()[source]#

Run Asynchronous Parallel DDS optimization.

Return type:

Path

run_nsga2()[source]#

Run NSGA-II multi-objective optimization.

Return type:

Path

run_cmaes()[source]#

Run CMA-ES (Covariance Matrix Adaptation Evolution Strategy) optimization.

Return type:

Path

run_dream()[source]#

Run DREAM (DiffeRential Evolution Adaptive Metropolis) optimization.

Return type:

Path

run_glue()[source]#

Run GLUE (Generalized Likelihood Uncertainty Estimation) analysis.

Return type:

Path

run_basin_hopping()[source]#

Run Basin Hopping global optimization.

Return type:

Path

run_nelder_mead()[source]#

Run Nelder-Mead simplex optimization.

Return type:

Path

run_ga()[source]#

Run Genetic Algorithm (GA) optimization.

Return type:

Path

run_bayesian_opt()[source]#

Run Bayesian Optimization with Gaussian Process surrogate.

Return type:

Path

run_moead()[source]#

Run MOEA/D (Multi-Objective Evolutionary Algorithm based on Decomposition).

Return type:

Path

run_simulated_annealing()[source]#

Run Simulated Annealing optimization.

Return type:

Path

run_abc()[source]#

Run Approximate Bayesian Computation (ABC-SMC) for likelihood-free inference.

Return type:

Path

run_adam(steps=100, lr=0.01)[source]#

Run Adam gradient-based optimization.

Parameters:
  • steps (int) – Number of optimization steps (passed via config ADAM_STEPS)

  • lr (float) – Learning rate (passed via config ADAM_LR)

Returns:

Path to results file

Return type:

Path

run_lbfgs(steps=50, lr=0.1)[source]#

Run L-BFGS gradient-based optimization.

Parameters:
  • steps (int) – Maximum number of steps (passed via config LBFGS_STEPS)

  • lr (float) – Initial step size (passed via config LBFGS_LR)

Returns:

Path to results file

Return type:

Path

run_final_evaluation(best_params)[source]#

Run final evaluation with best parameters over full experiment period.

Evaluates the model on both calibration and evaluation windows, then restores settings for reproducibility. Subclasses may override for custom behavior.

Parameters:

best_params (Dict[str, float]) – Best parameters from optimization (physical units)

Returns:

Dict with ‘final_metrics’, ‘calibration_metrics’, ‘evaluation_metrics’, ‘success’, ‘best_params’, or None if failed

Return type:

Dict[str, Any] | None

cleanup()[source]#

Cleanup parallel processing directories and temporary files.

Return type:

None

DDS Algorithm#

Dynamically Dimensioned Search (DDS) is accessed through the BaseModelOptimizer interface. Model-specific optimizers inherit from BaseModelOptimizer and use the DDS algorithm via the run_dds() method.

Usage:

# DDS is invoked through model-specific optimizers
from symfluence.optimization import OptimizationManager

opt_manager = OptimizationManager(config, logger)
results = opt_manager.calibrate_model()  # Uses algorithm from config

# Or directly via model optimizer
# optimizer.run_dds()  # Runs DDS optimization

Algorithm Selection#

# Configure algorithm in YAML
# OPTIMIZATION_ALGORITHM: DDS  # or DE, PSO, SCE-UA, NSGA-II

# Programmatic algorithm selection
from symfluence.optimization.optimization_manager import OptimizationManager

opt = OptimizationManager(config, logger)

# Available algorithms
algorithms = ['DDS', 'DE', 'PSO', 'SCE-UA', 'NSGA-II', 'ADAM', 'LBFGS']

Data Acquisition#

Acquisition Service#

Acquisition Service

Unified facade for all data acquisition workflows in SYMFLUENCE. Coordinates downloading and processing of geospatial attributes, forcing data, and observations from diverse sources (cloud, HPC, local). Acts as high-level orchestrator delegating to specialized acquisition handlers and cloud downloaders.

Architecture:

AcquisitionService provides two parallel acquisition paths:

  1. CLOUD Mode (CloudForcingDownloader): - Cloud-based data providers with direct HTTP/S3 access - DEM sources: Copernicus GLO-30/90, FABDEM, NASADEM, SRTM, ETOPO, Mapzen, ALOS - Soil class: SoilGrids via WCS subsetting - Land cover: MODIS Landcover (multi-year mode), USGS NLCD - Forcing: ERA5 (CDS), CARRA/CERRA (CDS), AORC (AWS/GCS), NEX-GDDP (Zenodo) - Observations: USGS, WSC, SMHI, SNOTEL, GRACE, MODIS snow/ET

  2. MAF Mode (gistoolRunner, datatoolRunner): - HPC-based data access via external MAF tools on supercomputers - gistool: MERIT-Hydro elevation, MODIS landcover, SoilGrids soil class - datatool: ERA5, RDRS, CASR forcing data with Slurm job monitoring - Configuration: Generates MAF JSON configs and executes MAF scheduler - Output: Same directory structure as CLOUD mode

Data Acquisition Workflows:
  1. Attribute Acquisition (acquire_attributes) - DEM/elevation: Multiple sources with fallback logic - Soil classification: SoilGrids primary, gistool fallback - Land cover: MODIS or USGS depending on availability - Output: GeoTIFF rasters at project_dir/attributes/{type}/

  2. Forcing Data Download (acquire_forcings) - Datasets: ERA5, CARRA, CERRA, AORC, NEX-GDDP - Mode selection: CLOUD vs MAF based on config.domain.data_access - Caching: RawForcingCache with automatic TTL/checksum validation - Unit conversion: Via VariableHandler for dataset-specific mappings - Output: NetCDF at project_dir/forcing/{dataset}_raw/

  3. Observation Data Retrieval (acquire_observations) - Streamflow: USGS (NWIS), WSC (Canada), SMHI (Nordic) - Gridded: GRACE, MODIS Snow, MODIS ET, FLUXNET - Point sensors: SNOTEL (NOAA snow/precip/temp) - Output: CSV at project_dir/observations/{type}/processed/

  4. EM-Earth Supplementary Data (acquire_em_earth_forcings) - Gridded ERA5 re-analysis supplementing point/coarse data - Subsetting: Via bounding box - Averaging: Spatial mean over domain - Output: NetCDF at project_dir/forcing/em_earth_supplementary/

Configuration Parameters:
Data Source Selection:

domain.data_access: ‘CLOUD’ or ‘MAF’ (default: ‘MAF’) domain.dem_source: ‘merit_hydro’, ‘copernicus’, ‘copdem90’, ‘fabdem’, ‘nasadem’, ‘srtm’, ‘etopo’, ‘mapzen’, ‘alos’ domain.land_class_source: ‘modis’, ‘usgs_nlcd’ (cloud only) domain.bounding_box_coords: ‘lat_min/lon_min/lat_max/lon_max’

Download Flags:

domain.download_dem: Enable DEM acquisition (default: True) domain.download_soil: Enable soil class acquisition (default: True) domain.download_landcover: Enable land cover acquisition (default: True)

Observation Sources:

optimization.observation_variables: List of variables to download evaluation.targets: Evaluation targets (e.g., ‘streamflow’)

MAF Configuration:

domain.hpc_account: HPC account for job submission domain.hpc_cache_dir: HPC cache directory domain.hpc_job_timeout: Max seconds to wait for jobs

Caching and Error Handling:

Raw Forcing Cache: - RawForcingCache manages downloaded forcing files - TTL: Files cached for configurable duration (default: 30 days) - Validation: Checksum-based integrity checking - Fallback: Automatic re-download if cache corrupted

Error Recovery: - Network failures: Retry with exponential backoff - Partial downloads: Cleanup and retry - Missing data: Warn and continue with available sources - Configuration errors: Validate early and report clearly

Examples

>>> # Create service and run all acquisitions
>>> from symfluence.data.acquisition.acquisition_service import AcquisitionService
>>> acq = AcquisitionService(config, logger, reporting_manager=reporter)
>>> acq.acquire_attributes()
>>> acq.acquire_forcings()
>>> acq.acquire_observations()
>>> acq.acquire_em_earth_forcings()
>>> # Cloud-only mode (faster for small domains)
>>> # Set config.domain.data_access = 'CLOUD'
>>> acq.acquire_attributes()
>>> # MAF mode (for large domains on HPC)
>>> # Set config.domain.data_access = 'MAF'
>>> acq.acquire_attributes()

References

  • MERIT-Hydro: Yamazaki et al. (2019) Global Hydrology, Earth System Science

  • Copernicus DEM: https://copernicus-dem-30m.s3.amazonaws.com/

  • FABDEM: Hawker et al. (2022) Scientific Data

  • SoilGrids: Poggio et al. (2021) Scientific Data

  • MODIS: Justice et al. (2002) Remote Sensing Reviews

class symfluence.data.acquisition.acquisition_service.AcquisitionService(config, logger, reporting_manager=None)[source]#

Bases: ConfigurableMixin

Unified data acquisition service for all SYMFLUENCE data needs.

High-level facade orchestrating geospatial attributes, forcing data, and observation data acquisition from multiple sources (cloud, HPC, local). Provides flexible acquisition modes (CLOUD vs MAF) and handles caching, error recovery, and visualization.

Acquisition Modes:

CLOUD Mode: - Direct HTTP/S3 access to cloud providers - Faster for small domains, requires internet access - DEM sources: Copernicus GLO-30/90, FABDEM, NASADEM, SRTM, ETOPO, Mapzen, ALOS - Forcing: ERA5 (CDS), CARRA/CERRA, AORC, NEX-GDDP - Suitable for research, testing, small basins

MAF Mode: - HPC-based via external MAF tools (gistool, datatool) - Better for large domains, requires HPC access - Same output format as CLOUD mode - Handles job queuing and monitoring via Slurm - Suitable for operational, large-scale applications

Data Acquisition Methods:

acquire_attributes(): Geospatial attributes (DEM, soil, landcover) acquire_forcings(): Meteorological forcing data (ERA5, CARRA, etc.) acquire_observations(): Validation data (streamflow, GRACE, SNOTEL, etc.) acquire_em_earth_forcings(): Supplementary forcing from EM-Earth

Key Features:
  • Multi-source geospatial data with automatic fallbacks

  • Caching with TTL and checksum-based validation

  • Parallel downloading where supported

  • Progress visualization via reporting_manager

  • Comprehensive error handling and logging

  • Configuration-driven mode selection

Parameters:
  • config (SymfluenceConfig | Dict[str, Any])

  • logger (Logger)

  • reporting_manager (Any)

config#

Typed SymfluenceConfig instance

logger#

Logger for acquisition progress tracking

data_dir#

Root data directory (from config.system.data_dir)

domain_name#

Domain identifier (from config.domain.name)

project_dir#

Project-specific directory (data_dir/domain_{domain_name})

reporting_manager#

Optional visualization manager

variable_handler#

VariableHandler for dataset-specific unit conversion

Configuration:

domain.data_access: ‘CLOUD’ or ‘MAF’ (default: ‘MAF’) domain.dem_source: DEM provider (‘merit_hydro’, ‘copernicus’, ‘copdem90’, ‘fabdem’, ‘nasadem’, ‘srtm’, ‘etopo’, ‘mapzen’, ‘alos’) domain.land_class_source: Land cover provider (‘modis’, ‘usgs_nlcd’) domain.download_dem: Enable DEM acquisition (default: True) domain.download_soil: Enable soil class (default: True) domain.download_landcover: Enable land cover (default: True)

Examples

>>> # Create service with config and logger
>>> acq = AcquisitionService(config, logger, reporting_manager=reporter)
>>> # Run complete acquisition workflow
>>> acq.acquire_attributes()   # DEM, soil, landcover
>>> acq.acquire_forcings()     # ERA5, CARRA, etc.
>>> acq.acquire_observations() # Streamflow, GRACE, etc.
>>> acq.acquire_em_earth_forcings()  # Supplementary data
>>> # Cloud-only mode (small domain)
>>> config.domain.data_access = 'CLOUD'
>>> acq.acquire_attributes()
>>> # MAF mode (large domain on HPC)
>>> config.domain.data_access = 'MAF'
>>> acq.acquire_forcings()

See also

CloudForcingDownloader: Cloud-based data source handlers gistoolRunner: HPC geospatial data extraction datatoolRunner: HPC forcing data extraction RawForcingCache: Forcing data caching system

acquire_attributes()[source]#

Acquire geospatial attributes including DEM, soil, and land cover data.

acquire_forcings()[source]#

Acquire forcing data for the model simulation.

acquire_observations()[source]#

Acquire additional observations based on configuration. This handles registry-based observations (GRACE, MODIS, etc.) that require an ‘acquire’ step before processing.

acquire_em_earth_forcings()[source]#

Acquire EM-Earth precipitation and temperature data.

Available Data Sources:

# Forcing datasets
forcing_sources = [
    'ERA5',        # ECMWF reanalysis
    'ERA5-Land',   # High-resolution land reanalysis
    'RDRS',        # Regional Deterministic Reforecast System
    'CARRA',       # Copernicus Arctic Regional Reanalysis
    'AORC',        # Analysis of Record for Calibration
    'CONUS404',    # CONUS 404 dataset
    'HRRR',        # High-Resolution Rapid Refresh
    'EM-Earth',    # EM-Earth reanalysis
    'NEX-GDDP',    # NASA climate projections
]

# Observation datasets
obs_sources = [
    'USGS',        # US Geological Survey streamflow
    'WSC',         # Water Survey of Canada
    'GRDC',        # Global Runoff Data Centre
    'MODIS',       # Remote sensing products
    'GRACE',       # Gravity recovery data
]

Acquisition Handlers#

from symfluence.data.acquisition import AcquisitionRegistry

# Get available handlers
handlers = AcquisitionRegistry.list_handlers()

# Get specific handler
era5_handler = AcquisitionRegistry.get_handler('ERA5')

Geospatial Operations#

Domain Discretization#

Domain discretization core module for Hydrologic Response Unit (HRU) creation.

Provides the DomainDiscretizer class for subdividing catchments into HRUs based on elevation bands, soil classes, land cover, aspect, or radiation.

class symfluence.geospatial.discretization.core.DomainDiscretizer(config, logger)[source]#

Bases: PathResolverMixin

A class for discretizing a domain into Hydrologic Response Units (HRUs).

This class provides methods for various types of domain discretization, including elevation-based, soil class-based, land class-based, and radiation-based discretization. HRUs are allowed to be MultiPolygons, meaning spatially disconnected areas with the same attributes are grouped into single HRUs.

config#

Configuration dictionary.

Type:

Dict[str, Any]

logger#

Logger object for logging information and errors.

root_path#

Root path for the project.

Type:

Path

domain_name#

Name of the domain being processed.

Type:

str

project_dir#

Directory for the current project.

Type:

Path

sort_catchment_shape()[source]#

Sort the catchment shapefile based on GRU and HRU IDs.

This method performs the following steps: 1. Loads the catchment shapefile 2. Sorts the shapefile based on GRU and HRU IDs 3. Saves the sorted shapefile back to the original location

The method uses GRU and HRU ID column names specified in the configuration.

Raises:
  • FileNotFoundError – If the catchment shapefile is not found.

  • ValueError – If the required ID columns are not present in the shapefile.

discretize_domain()[source]#

Discretize domain into Hydrologic Response Units (HRUs).

Creates HRUs by subdividing the catchment based on specified attributes. Supports multiple discretization methods that can be combined:

Single-Attribute Methods:
  • ‘lumped’: Single HRU for entire catchment

  • ‘elevation’: HRUs based on elevation bands

  • ‘landclass’: HRUs based on land cover classes

  • ‘soilclass’: HRUs based on soil type classes

  • ‘aspect’: HRUs based on aspect classes

  • ‘radiation’: HRUs based on potential radiation

Multi-Attribute Methods:
  • ‘elevation,landclass’: Combination of elevation and land cover

  • ‘elevation,soilclass’: Combination of elevation and soil type

  • Any comma-separated combination of attributes

Process:
  1. Check for existing custom catchment shapefile

  2. Load or create base catchment geometry

  3. Apply discretization method(s)

  4. Generate GRU (Grouped Response Unit) and HRU IDs

  5. Calculate HRU statistics and attributes

  6. Save shapefile with discretization results

Returns:

Path to generated HRU shapefile, or None if using existing shapefile

Raises:
  • ValueError – If discretization method not recognized

  • FileNotFoundError – If required input files (DEM, land cover, etc.) not found

  • Exception – If discretization process fails

Return type:

Path | None

Note

  • HRUs can be MultiPolygons (spatially disconnected areas with same attributes)

  • Minimum HRU size controlled by MIN_HRU_SIZE config parameter

  • Output shapefile includes: geometry, GRU_ID, HRU_ID, area, and attribute values

  • Files saved to: {project_dir}/shapefiles/catchment/{domain_name}_HRUs_{method}.shp

Example

For SUB_GRID_DISCRETIZATION=”elevation,landclass”, creates HRUs by: 1. Dividing catchment into elevation bands 2. Within each elevation band, subdividing by land cover class 3. Merging small HRUs below MIN_HRU_SIZE threshold

class symfluence.geospatial.discretization.core.DomainDiscretizationRunner(config, logger)[source]#

Bases: object

Wraps domain discretization with explicit artifact tracking.

Parameters:
  • config (Dict[str, Any])

  • logger (Any)

discretize_domain()[source]#
Return type:

Tuple[object | Dict[str, object] | None, DiscretizationArtifacts]

Discretization Methods:

# Available discretization approaches
methods = [
    'lumped',           # Single unit
    'GRUs',             # Grouped Response Units
    'elevation',        # Elevation bands
    'radiation',        # Radiation-based
    'combined',         # Multiple criteria
]

Evaluation#

Evaluators#

Base Model Evaluator

This module provides the abstract base class for different evaluation variables.

class symfluence.evaluation.evaluators.base.ModelEvaluator(config, project_dir=None, logger=None)[source]#

Bases: ConfigurableMixin, ABC

Abstract base class for hydrological model evaluation.

Provides standardized infrastructure for comparing simulated and observed data across different hydrological variables (streamflow, snow, ET, etc.). Handles time series alignment, period-based evaluation (calibration/validation), and multi-metric calculation using the centralized metrics module.

Subclasses must implement:
  • get_simulation_files(): Locate model output files

  • extract_simulated_data(): Parse simulation results

  • get_observed_data_path(): Locate observation files

  • needs_routing(): Whether mizuRoute output is required

  • _get_observed_data_column(): Identify data column in obs files

Parameters:
config#

SymfluenceConfig instance with typed access

calibration_period#

Tuple of (start, end) timestamps for calibration

evaluation_period#

Tuple of (start, end) timestamps for validation

eval_timestep#

Target timestep for comparison (‘native’, ‘hourly’, ‘daily’)

property variable_type: str#

Return the variable type for resampling behavior.

Override in subclasses for flux variables (precipitation, ET) that should use sum aggregation instead of mean.

Returns:

‘state’ (default) for state variables - use mean aggregation ‘flux’ for flux/accumulation variables - use sum aggregation

evaluate(sim, obs=None, mizuroute_dir=None, calibration_only=True)[source]#

Alias for calculate_metrics for consistency with other parts of the system

Parameters:
  • sim (Any)

  • obs (pandas.Series | None)

  • mizuroute_dir (Path | None)

  • calibration_only (bool)

Return type:

Dict[str, float] | None

calculate_metrics(sim, obs=None, mizuroute_dir=None, calibration_only=True)[source]#

Calculate performance metrics for this target.

Parameters:
  • sim (Any) – Either a Path to simulation directory or a pre-loaded pd.Series

  • obs (pandas.Series | None) – Optional pre-loaded pd.Series of observations. If None, loads from file.

  • mizuroute_dir (Path | None) – mizuRoute simulation directory (if needed and sim is Path)

  • calibration_only (bool) – If True, only calculate calibration period metrics

Return type:

Dict[str, float] | None

abstractmethod get_simulation_files(sim_dir)[source]#

Get relevant simulation output files for this target

Parameters:

sim_dir (Path)

Return type:

List[Path]

abstractmethod extract_simulated_data(sim_files, **kwargs)[source]#

Extract simulated data from output files

Parameters:

sim_files (List[Path])

Return type:

pandas.Series

abstractmethod get_observed_data_path()[source]#

Get path to observed data file

Return type:

Path

abstractmethod needs_routing()[source]#

Whether this target requires mizuRoute routing

Return type:

bool

align_series(sim, obs)[source]#

Align simulation and observation series after dropping spinup years.

Parameters:
  • sim (pandas.Series)

  • obs (pandas.Series)

Return type:

Tuple[pandas.Series, pandas.Series]

Available Evaluators:

from symfluence.evaluation.evaluators import (
    StreamflowEvaluator,
    ETEvaluator,
    SnowEvaluator,
    SoilMoistureEvaluator,
    GroundwaterEvaluator,
    TWSEvaluator,
)

# Initialize evaluator
evaluator = StreamflowEvaluator(config, project_dir, logger)

# Evaluate simulation
metrics = evaluator.evaluate(sim_dir)
# Returns: {'KGE': 0.85, 'NSE': 0.82, 'RMSE': 12.5, ...}

Metrics#

# Available metrics
metrics = [
    'KGE',      # Kling-Gupta Efficiency
    'KGEnp',    # Non-parametric KGE
    'NSE',      # Nash-Sutcliffe Efficiency
    'RMSE',     # Root Mean Square Error
    'MAE',      # Mean Absolute Error
    'PBIAS',    # Percent Bias
    'R2',       # Coefficient of Determination
]

Reporting#

Reporting Manager#

Central reporting facade for coordinating all SYMFLUENCE visualizations.

Provides a unified interface for generating publication-ready visualizations across all modeling stages: domain setup, calibration, evaluation, and multi-model comparison. Implements the Facade pattern to orchestrate specialized plotters while hiding complexity from client code.

Heavy lifting is delegated to three orchestrators: - ModelOutputOrchestrator: registry-based model output dispatch - CalibrationOrchestrator: post-calibration target dispatch and comparison plots - DiagnosticsOrchestrator: per-workflow-step diagnostic validation plots

class symfluence.reporting.reporting_manager.ReportingManager(config, logger, visualize=False, diagnostic=False)[source]#

Bases: ConfigMixin

Central facade coordinating all visualization and reporting in SYMFLUENCE.

Orchestrates diverse visualization workflows by delegating to specialized plotters for domain maps, calibration analysis, performance benchmarking, and diagnostics. Uses Facade and Lazy Initialization patterns.

Example

>>> rm = ReportingManager(config, logger, visualize=True)
>>> rm.plot_domain()          # Generate domain overview map
>>> rm.plot_calibration()     # Plot calibration convergence
Parameters:
__init__(config, logger, visualize=False, diagnostic=False)[source]#

Initialize the ReportingManager.

Parameters:
  • config (SymfluenceConfig) – SymfluenceConfig instance.

  • logger (Any) – Logger instance.

  • visualize (bool) – Boolean flag indicating if visualization is enabled.

  • diagnostic (bool) – Boolean flag indicating if diagnostic mode is enabled.

property plot_config: PlotConfig#

Lazy initialization of plot configuration.

property data_processor: DataProcessor#

Lazy initialization of data processor.

property spatial_processor: SpatialProcessor#

Lazy initialization of spatial processor.

property domain_plotter: DomainPlotter#

Lazy initialization of domain plotter.

property optimization_plotter: OptimizationPlotter#

Lazy initialization of optimization plotter.

property analysis_plotter: AnalysisPlotter#

Lazy initialization of analysis plotter.

property benchmark_plotter: BenchmarkPlotter#

Lazy initialization of benchmark plotter.

property snow_plotter: SnowPlotter#

Lazy initialization of snow plotter.

property diagnostic_plotter: DiagnosticPlotter#

Lazy initialization of diagnostic plotter.

property model_comparison_plotter: ModelComparisonPlotter#

Lazy initialization of model comparison plotter.

property forcing_comparison_plotter: ForcingComparisonPlotter#

Lazy initialization of forcing comparison plotter.

property workflow_diagnostic_plotter: WorkflowDiagnosticPlotter#

Lazy initialization of workflow diagnostic plotter.

visualize_data_distribution(data, variable_name, stage)[source]#

Visualize data distribution (histogram/boxplot).

Parameters:
  • data (Any)

  • variable_name (str)

  • stage (str)

Return type:

None

visualize_spatial_coverage(raster_path, variable_name, stage)[source]#

Visualize spatial coverage of raster data.

Parameters:
  • raster_path (Path)

  • variable_name (str)

  • stage (str)

Return type:

None

visualize_forcing_comparison(raw_forcing_file, remapped_forcing_file, forcing_grid_shp, hru_shp, variable='precipitation_flux', time_index=0)[source]#

Visualize raw vs. remapped forcing data comparison.

Parameters:
  • raw_forcing_file (Path)

  • remapped_forcing_file (Path)

  • forcing_grid_shp (Path)

  • hru_shp (Path)

  • variable (str)

  • time_index (int)

Return type:

str | None

is_visualization_enabled()[source]#

Check if visualization is enabled.

Return type:

bool

update_sim_reach_id(config_path=None)[source]#

Update the SIM_REACH_ID in both the config object and YAML file.

Parameters:

config_path (str | None)

Return type:

int | None

visualize_domain()[source]#

Visualize the domain boundaries and features.

Return type:

str | None

visualize_discretized_domain(discretization_method)[source]#

Visualize the discretized domain (HRUs/GRUs).

Parameters:

discretization_method (str)

Return type:

str | None

visualize_model_outputs(model_outputs, obs_files)[source]#

Visualize model outputs (streamflow comparison).

Parameters:
  • model_outputs (List[Tuple[str, str]])

  • obs_files (List[Tuple[str, str]])

Return type:

str | None

visualize_lumped_model_outputs(model_outputs, obs_files)[source]#

Visualize lumped model outputs.

Parameters:
  • model_outputs (List[Tuple[str, str]])

  • obs_files (List[Tuple[str, str]])

Return type:

str | None

visualize_fuse_outputs(model_outputs, obs_files)[source]#

Visualize FUSE model outputs.

Parameters:
  • model_outputs (List[Tuple[str, str]])

  • obs_files (List[Tuple[str, str]])

Return type:

str | None

visualize_summa_outputs(experiment_id)[source]#

Visualize SUMMA model outputs (all variables).

Parameters:

experiment_id (str)

Return type:

Dict[str, str]

visualize_ngen_results(sim_df, obs_df, experiment_id, results_dir)[source]#

Visualize NGen streamflow plots.

Parameters:
  • sim_df (Any)

  • obs_df (Any | None)

  • experiment_id (str)

  • results_dir (Path)

Return type:

None

visualize_lstm_results(results_df, obs_streamflow, obs_snow, use_snow, output_dir, experiment_id)[source]#

Visualize LSTM simulation results.

Parameters:
  • results_df (Any)

  • obs_streamflow (Any)

  • obs_snow (Any)

  • use_snow (bool)

  • output_dir (Path)

  • experiment_id (str)

Return type:

None

visualize_hype_results(sim_flow, obs_flow, outlet_id, domain_name, experiment_id, project_dir)[source]#

Visualize HYPE streamflow comparison.

Parameters:
  • sim_flow (Any)

  • obs_flow (Any)

  • outlet_id (str)

  • domain_name (str)

  • experiment_id (str)

  • project_dir (Path)

Return type:

None

visualize_model_results(model_name, **kwargs)[source]#

Visualize model results using registry-based dispatch.

Parameters:

model_name (str)

Return type:

Any | None

visualize_timeseries_results()[source]#

Visualize timeseries results from the standard results file.

Return type:

None

visualize_benchmarks(benchmark_results)[source]#

Visualize benchmark results.

Parameters:

benchmark_results (Dict[str, Any])

Return type:

List[str]

visualize_snow_comparison(model_outputs)[source]#

Visualize snow comparison.

Parameters:

model_outputs (List[List[str]])

Return type:

Dict[str, Any]

visualize_optimization_progress(history, output_dir, calibration_variable, metric)[source]#

Visualize optimization progress.

Parameters:
  • history (List[Dict])

  • output_dir (Path)

  • calibration_variable (str)

  • metric (str)

Return type:

None

visualize_optimization_depth_parameters(history, output_dir)[source]#

Visualize depth parameter evolution.

Parameters:
  • history (List[Dict])

  • output_dir (Path)

Return type:

None

visualize_sensitivity_analysis(sensitivity_data, output_file, plot_type='single')[source]#

Visualize sensitivity analysis results.

Parameters:
  • sensitivity_data (Any)

  • output_file (Path)

  • plot_type (str)

Return type:

None

visualize_decision_impacts(results_file, output_folder)[source]#

Visualize decision analysis impacts.

Parameters:
  • results_file (Path)

  • output_folder (Path)

Return type:

None

visualize_hydrographs_with_highlight(results_file, simulation_results, observed_streamflow, decision_options, output_folder, metric='kge')[source]#

Visualize hydrographs with top performers highlighted.

Parameters:
  • results_file (Path)

  • simulation_results (Dict)

  • observed_streamflow (Any)

  • decision_options (Dict)

  • output_folder (Path)

  • metric (str)

Return type:

None

visualize_drop_analysis(drop_data, optimal_threshold, project_dir)[source]#

Visualize drop analysis for stream threshold selection.

Parameters:
  • drop_data (List[Dict])

  • optimal_threshold (float)

  • project_dir (Path)

Return type:

None

generate_model_comparison_overview(experiment_id=None, context='run_model')[source]#

Generate model comparison overview for all models with valid output.

Parameters:
  • experiment_id (str | None)

  • context (str)

Return type:

str | None

visualize_calibration_results(experiment_id=None, calibration_target=None)[source]#

Generate comprehensive post-calibration visualizations.

Parameters:
  • experiment_id (str | None)

  • calibration_target (str | None)

Return type:

Dict[str, str]

diagnostic_domain_definition(basin_gdf, dem_path=None)[source]#

Generate diagnostic plots for domain definition step.

Parameters:
  • basin_gdf (Any)

  • dem_path (Path | None)

Return type:

str | None

diagnostic_discretization(hru_gdf, method)[source]#

Generate diagnostic plots for discretization step.

Parameters:
  • hru_gdf (Any)

  • method (str)

Return type:

str | None

diagnostic_observations(obs_df, obs_type)[source]#

Generate diagnostic plots for observation processing step.

Parameters:
  • obs_df (Any)

  • obs_type (str)

Return type:

str | None

diagnostic_forcing_raw(forcing_nc, domain_shp=None)[source]#

Generate diagnostic plots for raw forcing acquisition step.

Parameters:
  • forcing_nc (Path)

  • domain_shp (Path | None)

Return type:

str | None

diagnostic_forcing_remapped(raw_nc, remapped_nc, hru_shp=None)[source]#

Generate diagnostic plots for forcing remapping step.

Parameters:
  • raw_nc (Path)

  • remapped_nc (Path)

  • hru_shp (Path | None)

Return type:

str | None

diagnostic_model_preprocessing(input_dir, model_name)[source]#

Generate diagnostic plots for model preprocessing step.

Parameters:
  • input_dir (Path)

  • model_name (str)

Return type:

str | None

diagnostic_model_output(output_nc, model_name)[source]#

Generate diagnostic plots for model output step.

Parameters:
  • output_nc (Path)

  • model_name (str)

Return type:

str | None

diagnostic_attributes(dem_path=None, soil_path=None, land_path=None)[source]#

Generate diagnostic plots for attribute acquisition step.

Parameters:
  • dem_path (Path | None)

  • soil_path (Path | None)

  • land_path (Path | None)

Return type:

str | None

diagnostic_calibration(history=None, best_params=None, obs_vs_sim=None, model_name='Unknown')[source]#

Generate diagnostic plots for calibration step.

Parameters:
  • history (List[Dict] | None)

  • best_params (Dict[str, float] | None)

  • obs_vs_sim (Dict[str, Any] | None)

  • model_name (str)

Return type:

str | None

diagnostic_coupling_conservation(graph, output_dir=None)[source]#

Generate conservation diagnostic for a coupled model run.

Parameters:
  • graph (Any)

  • output_dir (Path | None)

Return type:

str | None

Visualization Methods:

from symfluence.reporting.reporting_manager import ReportingManager

rm = ReportingManager(config, logger)

# Generate domain map
rm.plot_domain_map()

# Generate hydrograph
rm.plot_hydrograph(observed, simulated)

# Generate calibration convergence plot
rm.plot_calibration_convergence(results)

# Generate sensitivity analysis plot
rm.visualize_sensitivity_analysis(sensitivity_results)

Configuration#

SymfluenceConfig#

Configuration management facade for SYMFLUENCE.

Provides high-level access to configuration loading, normalization, validation, and type-safe configuration models. Acts as the public API for external code and CLI commands that need to work with SYMFLUENCE configurations.

Exports:

SymfluenceConfig: Type-safe configuration model with hierarchical access ensure_typed_config: Adapter to convert dict configs to SymfluenceConfig normalize_config: Normalize and apply aliases to configuration dictionaries validate_config: Validate configuration against schema

class symfluence.core.config.SymfluenceConfig(*args, **kwargs)[source]#

Bases: BaseModel

Hierarchical root configuration model for SYMFLUENCE.

Organizes 346+ configuration parameters into logical nested sections: - system: System settings (paths, logging, MPI) - domain: Domain definition (timing, spatial extent, discretization) - forcing: Meteorological forcing data - model: Hydrological model configurations - optimization: Calibration and optimization settings - evaluation: Evaluation data and analysis - paths: File paths and directories

Features: - Type-safe hierarchical access: config.domain.name vs config[‘DOMAIN_NAME’] - Factory methods: from_preset(), from_minimal(), from_file() - Backward compatibility: to_dict(), get(), __getitem__() - Immutable after creation (frozen=True) to prevent mutation bugs - All validation logic preserved from original flat model

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

validate_time_periods()#

Validate that time periods make logical sense

validate_coordinates()#

Validate coordinate formats and bounds

validate_model_requirements()#

Validate model-specific required fields based on HYDROLOGICAL_MODEL.

Delegates to the model config-resolution helpers for all model-specific validation.

validate_spatial_mode_consistency()#

Validate and auto-align spatial modes with domain definition

validate_optimization_configuration()#

Validate optimization algorithm and parameter settings

to_dict(flatten=True)[source]#

Convert configuration to dictionary.

Parameters:

flatten (bool) – If True, returns flat dict with uppercase keys (legacy format) If False, returns nested dict structure

Returns:

Configuration as dictionary

Return type:

Dict[str, Any]

Example

>>> config = SymfluenceConfig.from_preset('fuse-basic')
>>> flat_dict = config.to_dict(flatten=True)
>>> flat_dict['DOMAIN_NAME']
'my_basin'
get(key, default=None)[source]#

Dict-like get method for backward compatibility.

Supports both flat keys (‘DOMAIN_NAME’) and dotted paths (‘domain.name’).

Parameters:
  • key (str) – Configuration key (uppercase) or dotted path

  • default (Any) – Default value if key not found

Returns:

Configuration value or default

Return type:

Any

Example

>>> config.get('DOMAIN_NAME')
'my_basin'
>>> config.get('NONEXISTENT', 'fallback')
'fallback'
__getitem__(key)[source]#

Dict-like bracket access for backward compatibility.

Parameters:

key (str) – Configuration key (uppercase)

Returns:

Configuration value

Raises:

KeyError – If key not found

Return type:

Any

Example

>>> config['DOMAIN_NAME']
'my_basin'
__contains__(key)[source]#

Check if key exists in configuration.

Parameters:

key (str)

Return type:

bool

__getattr__(name)[source]#

Provide attribute-style access for legacy flat keys.

Parameters:

name (str)

Return type:

Any

classmethod from_file(path, overrides=None, *, use_env=True, validate=True)[source]#

Load configuration from YAML file with full 5-layer hierarchy.

Loading precedence (highest to lowest): 1. CLI overrides (programmatic) 2. Environment variables (SYMFLUENCE_*) 3. Config file (YAML) 4. Defaults from nested models

Parameters:
  • path (Path) – Path to configuration YAML file

  • overrides (Dict[str, Any] | None) – Dictionary of CLI/programmatic overrides

  • use_env (bool) – Whether to load environment variables (default: True)

  • validate (bool) – Whether to validate using Pydantic (default: True)

Returns:

Validated SymfluenceConfig instance

Raises:
  • ConfigurationError – If configuration is invalid

  • FileNotFoundError – If config file is missing

Return type:

SymfluenceConfig

Example

>>> config = SymfluenceConfig.from_file(
...     'config.yaml',
...     overrides={'DEBUG_MODE': True}
... )
classmethod from_preset(preset_name, **overrides)[source]#

Create configuration from a named preset.

Parameters:
  • preset_name (str) – Name of preset (‘fuse-provo’, ‘summa-basic’, etc.)

  • **overrides – Additional overrides to apply on top of preset

Returns:

Fully validated SymfluenceConfig instance

Return type:

SymfluenceConfig

Example

>>> config = SymfluenceConfig.from_preset(
...     'fuse-provo',
...     DOMAIN_NAME='my_basin',
...     EXPERIMENT_TIME_START='2020-01-01 00:00'
... )
classmethod from_minimal(domain_name, model, forcing_dataset='ERA5', **overrides)[source]#

Create minimal viable configuration for quick setup.

Automatically applies sensible defaults based on model choice.

Parameters:
  • domain_name (str) – Name for the domain/basin

  • model (str) – Hydrological model (‘SUMMA’, ‘FUSE’, ‘GR’, etc.)

  • forcing_dataset (str) – Forcing data source (default: ‘ERA5’)

  • **overrides – Additional configuration overrides

Returns:

Validated SymfluenceConfig with minimal required fields

Return type:

SymfluenceConfig

Example

>>> config = SymfluenceConfig.from_minimal(
...     domain_name='test_basin',
...     model='SUMMA',
...     POUR_POINT_COORDS='51.17/-115.57',
...     EXPERIMENT_TIME_START='2020-01-01 00:00',
...     EXPERIMENT_TIME_END='2020-12-31 23:00'
... )
symfluence.core.config.ensure_config(config)[source]#

Convert dict to SymfluenceConfig if needed.

This function centralizes the config coercion pattern that was previously duplicated across 60+ files. It uses a runtime import to avoid circular dependency issues while providing type safety via TYPE_CHECKING.

Parameters:

config (Dict[str, Any] | SymfluenceConfig) – Configuration as dict or SymfluenceConfig instance

Returns:

SymfluenceConfig instance

Raises:

TypeError – If config is neither a dict nor SymfluenceConfig

Return type:

SymfluenceConfig

Example

>>> from symfluence.core.config import ensure_config
>>> cfg = ensure_config({'DOMAIN_NAME': 'test_domain', ...})
>>> isinstance(cfg, SymfluenceConfig)
True
symfluence.core.config.coerce_config(config, strict=None, warn=True)[source]#

Convert dict to SymfluenceConfig if possible, with configurable fallback.

Similar to ensure_config but can return the original dict if conversion fails (e.g., for partial configs in tests). This maintains backward compatibility with code that may pass incomplete configuration dictionaries.

Parameters:
  • config (Dict[str, Any] | SymfluenceConfig) – Configuration as dict or SymfluenceConfig instance

  • strict (bool) – If True, raise error instead of falling back to dict. If None, uses SYMFLUENCE_STRICT_CONFIG environment variable.

  • warn (bool) – If True (default), emit DeprecationWarning when falling back to dict. Set to False for tests or cases where dict fallback is intentional.

Returns:

SymfluenceConfig if conversion succeeds, original dict otherwise (unless strict=True, which raises on failure)

Raises:
  • TypeError – If strict=True and config cannot be converted

  • ValueError – If strict=True and config validation fails

Return type:

SymfluenceConfig | Dict[str, Any]

Example

>>> # Full config converts to SymfluenceConfig
>>> cfg = coerce_config({'DOMAIN_NAME': 'test', ...})
>>>
>>> # Partial config in tests falls back to dict (with warning)
>>> partial = coerce_config({'some_key': 'value'})
>>> isinstance(partial, dict)
True
>>>
>>> # Strict mode raises instead of falling back
>>> coerce_config({'invalid': 'config'}, strict=True)  # Raises!

Note

The fallback behavior is deprecated and will be removed in a future version. Use ensure_config() for strict conversion, or pass warn=False if you intentionally need dict fallback (e.g., in tests).

symfluence.core.config.ensure_typed_config(config)[source]#

Ensure configuration is a SymfluenceConfig instance.

This adapter function converts dict configs to SymfluenceConfig if needed. Use this when interfacing with external code that may pass dict configs.

Parameters:

config (Dict[str, Any] | SymfluenceConfig) – Configuration as dict or SymfluenceConfig

Returns:

SymfluenceConfig instance

Return type:

SymfluenceConfig

Example

>>> config = ensure_typed_config({'DOMAIN_NAME': 'test', ...})
>>> isinstance(config, SymfluenceConfig)
True
symfluence.core.config.normalize_config(config)[source]#

Normalize configuration keys using aliases and perform type coercion.

Parameters:

config (Dict[str, Any]) – Dictionary of configuration settings

Returns:

New dictionary with normalized keys and coerced values

Return type:

Dict[str, Any]

symfluence.core.config.validate_config(config)[source]#

Validate configuration using Pydantic model.

Parameters:

config (Dict[str, Any]) – Dictionary of configuration settings

Returns:

Validated configuration dictionary

Raises:

ValueError – If configuration is invalid

Return type:

Dict[str, Any]

Loading and Using Configuration:

from symfluence.core.config import SymfluenceConfig, ensure_typed_config

# Load from file
config = SymfluenceConfig.from_file("config.yaml")

# From dictionary
config = SymfluenceConfig(**config_dict)

# Ensure typed config (for mixed dict/config inputs)
config = ensure_typed_config(maybe_dict_or_config)

# Access configuration values
domain = config.domain.name
model = config.model.hydrological_model

# Convert to dictionary
flat_dict = config.to_dict(flatten=True)

Utilities#

Path Management#

from symfluence.data.path_manager import PathManager

pm = PathManager(config)

# Access standard paths
project_dir = pm.project_dir
forcing_dir = pm.forcing_dir
simulations_dir = pm.simulations_dir
observations_dir = pm.observations_dir

Logging#

from symfluence.project.logging_manager import LoggingManager

# Initialize logging
log_mgr = LoggingManager(config)
logger = log_mgr.get_logger("my_module")

# Log messages
logger.info("Processing started")
logger.warning("Optional data not found")
logger.error("Critical failure")

Error Handling#

from symfluence.core.exceptions import (
    SymfluenceError,           # Base exception
    ConfigurationError,        # Config issues
    DataAcquisitionError,      # Data download failures
    ModelExecutionError,       # Model run failures
    ValidationError,           # Validation failures
)

try:
    conf.run_workflow()
except ConfigurationError as e:
    print(f"Configuration problem: {e}")
except ModelExecutionError as e:
    print(f"Model failed: {e}")
except SymfluenceError as e:
    print(f"General error: {e}")

Advanced Usage#

Custom Workflow#

from symfluence import SYMFLUENCE

conf = SYMFLUENCE("config.yaml")

# Run subset of steps
conf.setup_project()
conf.define_domain()

# Skip to model execution (assumes data exists)
conf.preprocess_models()
conf.run_models()

# Custom post-processing
results = conf.postprocess_results()

# Access internal managers
model_mgr = conf.managers['model']
data_mgr = conf.managers['data']

Parallel Execution#

# Configure in YAML
# NUM_PROCESSES: 8
# PARALLEL_CALIBRATION: true

# Or programmatically
config_dict['NUM_PROCESSES'] = 8
config_dict['PARALLEL_CALIBRATION'] = True

conf = SYMFLUENCE(config_dict)
conf.calibrate_model()  # Uses parallel execution

Batch Processing#

from symfluence import SYMFLUENCE
from pathlib import Path

# Process multiple domains
config_files = Path("configs/").glob("*.yaml")

for config_file in config_files:
    print(f"Processing {config_file.name}")
    conf = SYMFLUENCE(str(config_file))
    conf.run_workflow()

References#