API Reference#
Overview#
The SYMFLUENCE Python API provides programmatic access to the full workflow, from project setup through calibration and analysis. The primary entry point is the SYMFLUENCE class, which coordinates manager components through the WorkflowOrchestrator.
Quick Start#
Basic Usage#
from symfluence import SYMFLUENCE
# Initialize from configuration file
conf = SYMFLUENCE("my_project.yaml")
# Run complete workflow
conf.run_workflow()
# Or run individual steps
conf.setup_project()
conf.define_domain()
conf.acquire_forcings()
conf.run_models()
Step-by-Step Execution#
from symfluence import SYMFLUENCE
# Initialize
conf = SYMFLUENCE("config.yaml")
# 1. Project Setup
conf.setup_project()
conf.create_pour_point()
# 2. Domain Definition
conf.acquire_attributes()
conf.define_domain()
conf.discretize_domain()
# 3. Data Acquisition
conf.process_observed_data()
conf.acquire_forcings()
conf.run_model_agnostic_preprocessing()
# 4. Model Execution
conf.preprocess_models()
conf.run_models()
conf.postprocess_results()
# 5. Calibration (optional)
conf.calibrate_model()
# 6. Analysis
conf.run_benchmarking()
conf.run_sensitivity_analysis()
Configuration Access#
from symfluence import SYMFLUENCE
from symfluence.core.config import SymfluenceConfig
# Load typed configuration directly
config = SymfluenceConfig.from_file("config.yaml")
# Access typed attributes
print(f"Domain: {config.domain.name}")
print(f"Model: {config.model.hydrological_model}")
print(f"Start: {config.experiment.time_start}")
# Initialize SYMFLUENCE with typed config
conf = SYMFLUENCE(config)
Core API#
SYMFLUENCE (Main Class)#
The primary interface for all SYMFLUENCE operations.
- class symfluence.core.system.SYMFLUENCE(config_input, config_overrides=None, debug_mode=False, visualize=False, diagnostic=False)[source]#
Bases:
objectEnhanced SYMFLUENCE main class with comprehensive CLI support.
This class serves as the central coordinator for all SYMFLUENCE operations, with enhanced CLI capabilities including individual step execution, pour point setup, SLURM job submission, and comprehensive workflow management.
- Parameters:
config_input (Path | str | SymfluenceConfig)
config_overrides (Dict[str, Any])
debug_mode (bool)
visualize (bool)
diagnostic (bool)
- __init__(config_input, config_overrides=None, debug_mode=False, visualize=False, diagnostic=False)[source]#
Initialize the SYMFLUENCE system with configuration and CLI options.
- Parameters:
config_input (Path | str | SymfluenceConfig) – Path to the configuration file or a SymfluenceConfig instance
config_overrides (Dict[str, Any]) – Dictionary of configuration overrides from CLI
debug_mode (bool) – Whether to enable debug mode
visualize (bool) – Whether to enable visualization
diagnostic (bool) – Whether to enable diagnostic plots for workflow validation
- run_workflow(force_run=None)[source]#
Execute the complete SYMFLUENCE workflow (CLI wrapper).
- Parameters:
force_run (bool | None)
- Return type:
None
- run_individual_steps(step_names)[source]#
Execute specific workflow steps by name.
Allows selective execution of individual workflow steps rather than running the complete pipeline. Useful for debugging, testing, or re-running specific portions of the workflow.
- Parameters:
step_names (List[str]) – List of step names to execute (e.g., [‘setup_project’, ‘calibrate_model’])
- Return type:
None
- get_workflow_status()[source]#
Return workflow completion status from the orchestrator.
- Returns:
Workflow status payload with step_details and counts.
- Return type:
Dict[str, Any]
Initialization:
from symfluence import SYMFLUENCE
# From YAML file path
conf = SYMFLUENCE("path/to/config.yaml")
# From SymfluenceConfig object
from symfluence.core.config import SymfluenceConfig
config = SymfluenceConfig.from_file("config.yaml")
conf = SYMFLUENCE(config)
# From dictionary
config_dict = {"DOMAIN_NAME": "test", ...}
conf = SYMFLUENCE(config_dict)
Core Methods:
# Full workflow execution
conf.run_workflow(force_run=False)
# Workflow status
status = conf.get_workflow_status()
# Returns: {
# "total_steps": 15,
# "completed_steps": 8,
# "pending_steps": 7,
# "step_details": [...]
# }
Manager Classes#
SYMFLUENCE uses a manager-based architecture where each major subsystem has a dedicated manager class.
Project Manager#
Handles project initialization and structure.
Project management for SYMFLUENCE hydrological modeling setups.
Handles project directory structure creation, pour point generation, and project metadata management for hydrological model domains.
- class symfluence.project.project_manager.ProjectManager(config, logger)[source]#
Bases:
ConfigurableMixinManages project-level operations including directory structure and initialization.
The ProjectManager is responsible for creating and managing the project directory structure, handling pour point creation, and maintaining project metadata. It serves as the foundation for all other SYMFLUENCE components by establishing the physical file organization that the workflow depends on.
Key responsibilities: - Creating the project directory structure - Generating pour point shapefiles from coordinates - Validating project structure integrity - Providing project metadata to other components
- Parameters:
config (SymfluenceConfig)
logger (Logger)
- config#
Configuration dictionary containing project settings
- Type:
Dict[str, Any]
- logger#
Logger instance for recording operations
- Type:
logging.Logger
- __init__(config, logger)[source]#
Initialize the ProjectManager.
- Parameters:
config (SymfluenceConfig) – SymfluenceConfig instance
logger (Logger) – Logger instance
- Raises:
TypeError – If config is not a SymfluenceConfig instance
- setup_project()[source]#
Set up the project directory structure.
Creates the main project directory and all required subdirectories. Project layout (canonical, post-2026):
- {project_dir}/
shapefiles/{pour_point, catchment, river_network, river_basins}/ data/
attributes/ <- DEM, soil, landclass, etc. forcing/ <- raw + merged + basin-averaged forcing observations/ <- streamflow, snotel, etc. model_ready/ <- model-agnostic store
The
data/prefix is created up-front so thatresolve_data_subdirconsistently resolves to the new layout for fresh projects. Without this,setup_projectused to create the legacyattributes/directory directly, which maderesolve_data_subdirpick the legacy path on subsequent reads. Some downstream callers (e.g. TauDEM indefine_domain) construct the path from string templates anchored atdata/attributes/...and then fail to find the DEM that was written into the legacyattributes/...tree.Legacy projects with a pre-existing
attributes/directory continue to work via the backward-compat branch inresolve_data_subdir— this change only affects freshsetup_projectruns.- Returns:
Path to the created project directory
- Return type:
Path
- Raises:
OSError – If directory creation fails due to permission or disk space issues
- create_pour_point()[source]#
Create pour point shapefile from coordinates if specified.
If pour point coordinates are specified in the configuration, creates a GeoDataFrame with a single point geometry and saves it as a shapefile at the appropriate location. If ‘default’ is specified, assumes a user-provided pour point shapefile exists.
- Returns:
- Path to the created pour point shapefile if successful,
None if using a user-provided shapefile or if creation fails
- Return type:
Optional[Path]
- Raises:
ValueError – If the pour point coordinates are in an invalid format
Exception – For other errors during shapefile creation
- get_project_info()[source]#
Get information about the project configuration.
Collects key project metadata into a dictionary for reporting, logging, or providing status information to other components.
The returned information includes: - Domain name - Experiment ID - Project directory path - Data directory path - Pour point coordinates
- Returns:
Dictionary containing project information
- Return type:
Dict[str, Any]
Key Methods:
from symfluence.project.project_manager import ProjectManager
pm = ProjectManager(config, logger)
# Setup project directory structure
pm.setup_project()
# Create pour point from coordinates
pm.create_pour_point()
# Get project information
info = pm.get_project_info()
Domain Manager#
Manages domain definition and discretization.
Domain management facade for SYMFLUENCE geospatial operations.
Coordinates domain definition, delineation, and discretization workflows with integrated visualization and artifact tracking.
- class symfluence.geospatial.domain_manager.DomainManager(config, logger, reporting_manager=None)[source]#
Bases:
ConfigurableMixinOrchestrates all geospatial domain operations for hydrological modeling setup.
This manager coordinates domain definition (delineation), spatial discretization, and artifact tracking for hydrological modeling workflows. It provides a unified interface for creating HRU (Hydrologic Response Unit) configurations from various spatial data sources and discretization strategies.
- Architecture:
Facade Pattern - Coordinates specialized geospatial services: - DomainDelineator: Watershed boundary extraction and network topology - DomainDiscretizationRunner: HRU creation via spatial disaggregation - Artifact Tracking: Maintains references to all created shapefiles - Visualization Integration: Optional reporting for QA/QC
- Domain Definition Methods:
- point:
Creates square bounding box domain from coordinates
Use case: FLUXNET sites, point-scale modeling
Output: Single polygon shapefile
- lumped:
Single-basin watershed delineation from pour point
Use case: Traditional lumped hydrological modeling
Output: Single watershed polygon + optional delineated routing network
Special: Supports lumped-to-distributed routing workflow
With subset_from_geofabric=True: Dissolves geofabric basins to single polygon
- semidistributed:
Full TauDEM-based watershed delineation from DEM
Use case: Detailed distributed modeling with subcatchments
Output: River network + subcatchment polygons
Optional: Coastal watershed handling
With subset_from_geofabric=True: Extracts from existing geofabric
- distributed:
Regular grid domain with D8 flow direction
Use case: Grid-based land surface models (VIC, MESH, CLM)
Output: Grid cells as both HRUs and routing segments
grid_source=’generate’: Create grid from bounding box
grid_source=’native’: Match forcing data resolution
- Discretization Strategies:
- lumped:
Single HRU representing entire basin
No spatial disaggregation
- elevation:
Elevation bands (e.g., 100m intervals)
Use case: Snow modeling, orographic effects
- landclass:
Land cover types (forest, urban, agriculture, etc.)
Use case: Land surface heterogeneity
- soilclass:
Soil classification types
Use case: Infiltration and runoff variability
- aspect:
Slope aspect classes (N, NE, E, SE, S, SW, W, NW)
Use case: Solar radiation and snow redistribution
- radiation:
Potential solar radiation classes
Use case: Energy balance modeling
- combined:
Multiple attributes combined (e.g., elevation × landclass)
Use case: Capturing complex spatial heterogeneity
Handles attribute interactions and MultiPolygons
- Workflow Sequence:
define_domain(): Create/extract watershed boundaries → Produces DelineationArtifacts (river basins, network, pour point)
discretize_domain(): Subdivide into HRUs → Produces DiscretizationArtifacts (HRU shapefile, attributes)
Visualization (optional): Spatial QA/QC plots → Generated via reporting_manager if available
- Artifact Tracking:
- DelineationArtifacts:
method: Domain definition method used
river_basins_path: Path to basin shapefile
river_network_path: Path to river network shapefile
pour_point_path: Path to pour point shapefile
metadata: Additional delineation metadata
- DiscretizationArtifacts:
method: Discretization method used
hru_shapefile_path: Path to HRU shapefile
attributes: HRU attributes DataFrame
statistics: Discretization statistics (HRU count, min/max areas)
- Configuration Dependencies:
- Domain Definition:
DOMAIN_DEFINITION_METHOD: point/lumped/semidistributed/distributed
DOMAIN_NAME: Basin identifier
SUBSET_FROM_GEOFABRIC: Extract from existing geofabric (default: False)
GRID_SOURCE (distributed): ‘generate’ or ‘native’
NATIVE_GRID_DATASET (distributed + native): Dataset identifier (default: ‘era5’)
POUR_POINT_SHP_PATH (lumped/semidistributed): Pour point location
RIVER_NETWORK_SHP_PATH (subset): Existing river network
DOMAIN_BOUNDING_BOX: Bbox coordinates
GRID_CELL_SIZE (distributed): Grid spacing in meters
- Discretization:
SUB_GRID_DISCRETIZATION: Discretization method
DEM_PATH: Elevation data (for elevation/aspect/radiation)
LAND_CLASS_PATH: Land cover data (for landclass)
SOIL_CLASS_PATH: Soil data (for soilclass)
ELEVATION_BAND_SIZE: Band interval in meters (default: 100)
- Delineation (TauDEM):
DELINEATE_COASTAL_WATERSHEDS: Coastal handling (True/False)
ROUTING_DELINEATION: Routing network strategy
- Output Files:
Shapefiles created in
project_dir/shapefiles/.Delineation outputs: river_basins, river_network, pour_point shapefiles.
Discretization outputs: catchment HRU shapefiles.
Example structure:
shapefiles/ ├── river_basins/ │ └── bow_river_riverBasins_lumped.shp ├── river_network/ │ └── bow_river_riverNetwork_lumped.shp ├── pour_point/ │ └── bow_river_pourPoint.shp └── catchment/ └── bow_river_HRUs_elevation.shp- Special Workflows:
- Lumped-to-Distributed Routing:
Define lumped domain (single watershed polygon)
Internally delineate subcatchments within lumped domain
Create area-weighted remapping (lumped HRU to distributed routing)
Enables distributed routing with lumped hydrology
- Coastal Watershed Delineation:
Special handling for basins draining to ocean
Avoids river network artifacts at coastline
Uses modified TauDEM workflow
- Grid-Based Distributed:
Creates regular grid cells
Assigns D8 flow direction from DEM
Detects and fixes routing cycles
Each cell is both HRU and routing segment
- Visualization Integration:
If reporting_manager available: - Delineation: Watershed boundary maps, river network plots - Discretization: HRU spatial distribution, attribute histograms - QA/QC: Identifies potential issues (small HRUs, disconnected polygons)
- Error Handling:
Validates configuration before execution
Raises descriptive errors for missing required shapefiles
Logs warnings for non-critical issues
Provides context for TauDEM failures
- Example Workflow:
>>> from symfluence.geospatial.domain_manager import DomainManager >>> config = SymfluenceConfig.from_file('config.yaml') >>> logger = setup_logger() >>> reporting = ReportingManager(config, logger) >>> >>> # Initialize manager >>> domain_mgr = DomainManager(config, logger, reporting) >>> >>> # Define watershed boundaries >>> domain_mgr.define_domain() >>> print(domain_mgr.delineation_artifacts.river_basins_path) # ./shapefiles/river_basins/bow_river_riverBasins_lumped.shp >>> >>> # Discretize into elevation bands >>> domain_mgr.discretize_domain() >>> print(domain_mgr.discretization_artifacts.statistics) # {'hru_count': 8, 'min_area_km2': 120.5, 'max_area_km2': 450.2}
- Performance Considerations:
Delineation: ~1-30 minutes (depends on DEM resolution, TauDEM)
Discretization: ~10 seconds - 5 minutes (depends on attribute resolution)
Grid generation: ~1-10 minutes (depends on grid cell count)
Memory: Peak during raster operations (~2-8 GB for high-res DEMs)
Notes
DomainDelineator initialized eagerly
DomainDiscretizationRunner initialized lazily when needed
Artifacts tracked for downstream workflows (preprocessing, modeling)
Reporting integration provides visual validation
Supports both simple (lumped) and complex (combined attributes) setups
See also
geospatial.delineation.DomainDelineator: Watershed delineation
geospatial.discretization.DomainDiscretizationRunner: HRU creation
geospatial.discretization.core.DomainDiscretizer: Discretization engine
geospatial.geofabric: Geofabric delineation backends
- Parameters:
config (SymfluenceConfig)
logger (Logger)
reporting_manager (Any | None)
- __init__(config, logger, reporting_manager=None)[source]#
Initialize the Domain Manager.
- Parameters:
config (SymfluenceConfig) – SymfluenceConfig instance
logger (Logger) – Logger instance
reporting_manager (Any | None) – ReportingManager instance
- Raises:
TypeError – If config is not a SymfluenceConfig instance
- create_point_domain_shapefile()[source]#
Create a square basin shapefile from bounding box coordinates for point modelling.
This method creates a rectangular polygon from the BOUNDING_BOX_COORDS and saves it as a shapefile for point-based modelling approaches.
- Returns:
Path to the created shapefile or None if failed
- Return type:
Path | None
- define_domain()[source]#
Define the domain using the configured method.
- Returns:
Tuple of the domain result and delineation artifacts
- Return type:
Tuple[Path | Tuple[Path, Path] | None, DelineationArtifacts]
- discretize_domain()[source]#
Discretize the domain into HRUs or GRUs.
- Returns:
Tuple of HRU shapefile(s) and discretization artifacts
- Return type:
Tuple[Path | dict | None, DiscretizationArtifacts]
- visualize_domain()[source]#
Create visualization of the domain.
- Returns:
Path to the created plot or None if failed
- Return type:
Path | None
- visualize_discretized_domain()[source]#
Create visualization of the discretized domain.
- Returns:
Path to the created plot or None if failed
- Return type:
Path | None
- get_domain_info()[source]#
Get information about the current domain configuration.
- Returns:
Dictionary containing domain information
- Return type:
Dict[str, Any]
- validate_domain_configuration()[source]#
Validate domain configuration settings.
Deprecated since version Use:
validate_readiness()instead. Config-level checks (required keys, definition method, bounding box format) are now handled by Pydantic validators at config construction time.- Returns:
True if configuration is valid, False otherwise
- Return type:
bool
- validate_readiness()[source]#
Validate that this manager is ready for execution.
Checks runtime prerequisites that Pydantic cannot verify at config construction time: subset geofabric config and grid source for distributed method.
- Returns:
Dict mapping check names to pass/fail booleans.
- Return type:
Dict[str, bool]
Key Methods:
from symfluence.geospatial.domain_manager import DomainManager
dm = DomainManager(config, logger)
# Define domain boundaries
dm.define_domain()
# Discretize into HRUs/GRUs
dm.discretize_domain()
# Get domain statistics
stats = dm.get_domain_statistics()
Data Manager#
Coordinates data acquisition and preprocessing.
Data Manager
Facade that coordinates acquisition, observation processing, and model-agnostic
preprocessing. Keeps orchestration thin while services handle the heavy
lifting. See docs under docs/source/configuration and docs/source/data
for full workflows.
- class symfluence.data.data_manager.DataManager(config, logger, reporting_manager=None)[source]#
Bases:
BaseManagerFacade that orchestrates acquisition, preprocessing, and observation handling.
Delegates to acquisition/preprocessing services and registries; keeps runtime imports slim. Detailed behaviour lives in the docs.
- Parameters:
config (SymfluenceConfig)
logger (Logger)
reporting_manager (Any | None)
- acquire_attributes()[source]#
Acquire geospatial attributes (DEM, soil classes, land cover) for the domain.
Downloads and processes required geospatial data layers including elevation, soil classification, and land cover data from configured data sources.
- acquire_forcings()[source]#
Acquire meteorological forcing data for the simulation period.
Downloads forcing variables (precipitation, temperature, radiation, etc.) from the configured forcing dataset (ERA5, RDRS, CARRA, etc.) for the specified temporal domain.
- acquire_observations()[source]#
Acquire observational data for model calibration and validation.
Downloads streamflow observations, snow measurements, and other validation data from configured observation sources (USGS, WSC, SNOTEL, etc.).
- acquire_em_earth_forcings()[source]#
Acquire EM-Earth supplementary forcing data.
Downloads and processes EM-Earth reanalysis data for gap-filling or supplementing primary forcing datasets.
- process_observed_data()[source]#
Process observed data including streamflow and additional variables.
- Raises:
DataAcquisitionError – If data processing fails
- run_model_agnostic_preprocessing()[source]#
Run model-agnostic preprocessing including basin averaging and resampling.
- Raises:
DataAcquisitionError – If preprocessing fails
- build_model_ready_store()[source]#
Build or refresh the model-ready data store.
Creates CF-1.8 compliant NetCDF files for forcings, observations, and attributes in
data/model_ready/.
- validate_data_directories()[source]#
Validate that required data directories exist.
Deprecated since version Use:
validate_readiness()instead.- Return type:
bool
Key Methods:
from symfluence.data.data_manager import DataManager
data_mgr = DataManager(config, logger)
# Acquire geospatial attributes
data_mgr.acquire_attributes()
# Process observed streamflow data
data_mgr.process_observed_data()
# Acquire forcing data (ERA5, RDRS, etc.)
data_mgr.acquire_forcings()
# Run model-agnostic preprocessing
data_mgr.run_model_agnostic_preprocessing()
Model Manager#
Coordinates model preprocessing, execution, and postprocessing.
Model Manager
Lightweight facade that resolves model workflows, runs preprocess/execute/
postprocess/visualize steps, and delegates component lookups to
ModelRegistry. Detailed behaviour now lives in the docs (see
docs/source/architecture and docs/source/models/*).
- class symfluence.models.model_manager.ModelManager(config, logger, reporting_manager=None)[source]#
Bases:
BaseManagerFacade that turns a hydrological model list into an ordered workflow.
Resolves routing dependencies, then runs preprocess → execute → postprocess → visualize phases using components pulled from
ModelRegistry. Full behaviour is covered in the docs; keeping this concise speeds imports.- Parameters:
config (SymfluenceConfig)
logger (Logger)
reporting_manager (Any | None)
- preprocess_models(params=None)[source]#
Preprocess forcing data into model-specific input formats.
Transforms generic forcing data (from data acquisition) into model-specific input formats. Invokes registered preprocessor for each model in resolved workflow. Preprocessors handle all model-specific input requirements.
- Preprocessing Workflow:
Resolve model workflow (includes implicit dependencies)
For each model in workflow: a. Create model input directory (project_dir/forcing/{MODEL}_input/) b. Retrieve preprocessor class from ModelRegistry c. Instantiate preprocessor with config, logger, and params d. Run preprocessor.run_preprocessing()
Preprocessor outputs go to model-specific input directories
- Model-Specific Preprocessing Examples:
SUMMA: - ERA5 NetCDF → SUMMA forcing file format - Time step interpolation/aggregation - Unit conversion (SI → SUMMA units) - Spatial interpolation to model grid
FUSE: - Catchment-averaged forcing extraction - Temporal aggregation to model timestep - Unit conversion
GR (Rainfall-Runoff): - Daily precipitation, temperature aggregation - Missing value handling
mizuRoute: - Basin delineation and network structure - Unit hydrograph parameters - Routing network initialization
- Parameter Usage:
params dict passed to preprocessor for calibration scenarios: - Preprocessor may use params to adjust input processing - Example: Parameter-dependent unit conversion or scaling - If preprocessor doesn’t accept params, they’re ignored
- Parameters:
params (Dict[str, Any] | None) – Optional Dict[str, Any] with parameter values - Example: {‘SAI_SV’: 0.3, ‘snowCriticalTemp’: -1.5} - Used for calibration (different param values → different inputs) - If None, uses default parameter values from config - Preprocessor determines if params are needed (introspection)
- Raises:
Exception – If preprocessing fails for any model (logged and re-raised) - Caught internally with full traceback logged - Enables debugging of preprocessing issues
- Side Effects:
Creates project_dir/forcing/{MODEL}_input/ directories
Generates model-specific input files
Logs preprocessing progress and errors to logger
Examples
>>> # Standard preprocessing with default parameters >>> manager.preprocess_models()
>>> # Preprocessing with parameter variations (calibration) >>> params = {'param1': 0.5, 'param2': 100.0} >>> manager.preprocess_models(params=params)
Notes
LSTM and similar data-driven models skip preprocessing
Registry lookup enables new models without modifying this method
Parameter introspection (inspect.signature) handles optional params
Errors in preprocessing halt workflow and raise exception
See also
ModelRegistry.get_preprocessor(): Retrieve preprocessor class run_models(): Execute preprocessed models postprocess_results(): Extract and standardize results
- run_models()[source]#
Execute models in resolved workflow order.
Prefers dCoupler graph-based execution when available for multi-model workflows (provides conservation checking, spatial remapping, and unit conversion). Falls back to sequential registry-based execution when dCoupler is not installed or graph execution fails.
See also
ModelRegistry.get_runner(): Retrieve runner class ModelRegistry.get_runner_method(): Get method name preprocess_models(): Prepare inputs postprocess_results(): Extract and standardize outputs
- postprocess_results()[source]#
Post-process model results using the registry.
Extracts streamflow and other relevant outputs from model-specific result files and converts them to a standardized format for evaluation and comparison. After postprocessing, calculates and logs baseline performance metrics.
The standardized interface expects postprocessors to implement extract_streamflow() method, which saves results to: project_dir/results/{experiment_id}_results.csv
Note
Automatically triggers visualization of timeseries results after extraction. Falls back to legacy extract_results() method for backward compatibility.
- log_baseline_performance()[source]#
Log baseline model performance metrics before calibration.
Calculates and logs performance metrics comparing simulated vs observed streamflow after initial model run. Provides diagnostic snapshot of model performance before calibration, enabling users to: 1. Assess initial model setup quality 2. Detect configuration issues 3. Establish baseline for improvement assessment 4. Identify models needing attention before calibration
Metrics Calculated:
KGE (Kling-Gupta Efficiency): Formula:
KGE = 1 - sqrt((r-1)² + (α-1)² + (β-1)²)where r is correlation coefficient, α is ratio of simulated to observed std dev, β is ratio of simulated to observed mean. Range: [-∞, 1]. KGE >= 0.7 indicates reasonable performance, 0.5 <= KGE < 0.7 requires calibration, KGE < 0.5 needs significant improvements, KGE < 0 is worse than using observed mean.KGE’ (Modified KGE): Symmetric variant for metric comparison. Useful when comparing multiple model configurations.
NSE (Nash-Sutcliffe Efficiency): Formula:
NSE = 1 - (Σ(Qobs-Qsim)² / Σ(Qobs-Qmean)²)Correlation-based metric with range [-∞, 1], similar interpretation as KGE. Less sensitive to bias and variability than KGE.Bias (%): Formula:
Bias = ((Mean_Sim - Mean_Obs) / Mean_Obs) × 100Positive values indicate model overestimates, negative indicates underestimates. Can indicate systematic model errors.- Data Sources:
Simulation Results: results_dir/{experiment_id}_results.csv - Generated by postprocess_results() - Contains model discharge columns
Observations: Multiple fallback strategies a. Results file column (if ‘obs’ or ‘observed’ in column name) b. Observations directory: project_dir/observations/streamflow/preprocessed/ c. External observation file with datetime and discharge columns
- Workflow:
Load simulation results CSV (index=datetime)
Find observation column or load from observations directory
For each simulation column (e.g., ‘SUMMA_discharge_cms’): a. Align observations and simulation by datetime index b. Remove NaN pairs c. Calculate metrics (KGE, KGE’, NSE, Bias) d. Log results with interpretation
Log footer with metrics summary
Output Format:
======================================================== BASELINE MODEL PERFORMANCE (before calibration) ======================================================== MODELNAME: KGE = 0.7234 KGE' = 0.7156 NSE = 0.6987 Bias = +5.3% Valid data points: 1825 Note: KGE >= 0.7 indicates reasonable baseline performance ========================================================
- Error Handling:
Results file not found: Skipped with debug message
No observations found: Skipped with debug message
Insufficient valid data (<10 points): Logged as warning
Metric calculation errors: Caught and logged as debug
- Side Effects:
Logs baseline metrics to logger.info()
Logs interpretation and recommendations
No files created or modified
Examples
>>> # Called automatically by postprocess_results() >>> manager.postprocess_results() # Includes baseline logging
>>> # Or called directly >>> manager.log_baseline_performance()
Notes
Called automatically by postprocess_results()
Requires results file from postprocessing
Useful for QA/QC before calibration begins
Graceful degradation if data not available
KGE interpretation helps understand model biases
See also
postprocess_results(): Automatically calls log_baseline_performance() kge(), kge_prime(), nse(): Metric calculation functions evaluation.metrics: Metric library
- visualize_outputs()[source]#
Visualize model outputs using registered model visualizers.
Invokes visualization functions for each primary model in the configuration. Visualizers are registered per-model and handle model-specific output formats.
Note
Requires reporting_manager to be configured. Skips visualization if not available. Each model can register its own visualization function with the ModelRegistry.
Key Methods:
from symfluence.models.model_manager import ModelManager
mm = ModelManager(config, logger)
# Preprocess for all configured models
mm.preprocess_models()
# Run model simulations
mm.run_models()
# Extract and format results
mm.postprocess_results()
# Get available models
models = mm.get_available_models()
Optimization Manager#
Handles calibration and optimization.
Optimization Manager
Coordinates calibration runs by selecting algorithms and model-specific
optimizers. Keeps orchestration lean; algorithm details and examples are in
docs/source/calibration and optimizer pages.
- class symfluence.optimization.optimization_manager.OptimizationManager(config, logger, reporting_manager=None)[source]#
Bases:
BaseManagerFacade over model-specific optimizers for iterative calibration.
Chooses algorithm, fetches optimizer from the registry, and runs it; heavy lifting stays in the optimizer classes. See docs for workflow details.
- Parameters:
config (SymfluenceConfig)
logger (Logger)
reporting_manager (Any | None)
- property optimizers: Any#
expose registered optimizers/algorithms.
- Type:
Backward compatibility
- run_optimization_workflow()[source]#
Run main optimization workflow based on configuration.
Entry point for complete optimization process. Checks configuration to determine which optimization methods to execute and runs them in sequence. Currently supports ‘iteration’ (calibration) and handles deprecated method warnings.
- Workflow:
Check config.optimization.methods (list of methods)
For each enabled method: - ‘iteration’: Run calibrate_model() for iterative optimization - Deprecated methods: Log warnings
Return results from all executed methods
- Configuration:
optimization.methods: List of optimization methods - Example: [‘iteration’] enables calibration - Example: [‘iteration’, ‘emulation’] enables both (emulation deprecated)
- Supported Methods:
‘iteration’: Iterative parameter optimization (calibration)
- Deprecated Methods (logged as warnings):
‘differentiable_parameter_emulation’: Use gradient-based (ADAM/LBFGS) instead
‘emulation’: Use model emulation libraries instead
- Returns:
Results from completed workflows - Keys: Method names (e.g., ‘calibration’) - Values: Path to results file as string, or None if failed - Example: {‘calibration’: ‘/path/to/results.csv’}
- Return type:
Dict[str, Any]
- Side Effects:
Logs method execution and warnings to logger
Calls calibrate_model() if ‘iteration’ enabled
May create results files and directories
Examples
>>> # Standard workflow >>> opt_mgr = OptimizationManager(config, logger) >>> results = opt_mgr.run_optimization_workflow() >>> if 'calibration' in results: ... print(f"Calibration results: {results['calibration']}")
>>> # With deprecated method (warning logged) >>> # config.optimization.methods = ['iteration', 'emulation'] >>> results = opt_mgr.run_optimization_workflow() >>> # Logs warning about 'emulation' being deprecated
Notes
Only ‘iteration’ currently implemented
Deprecated methods logged but not executed
Non-empty results dict indicates at least one method ran
Empty dict means no methods were enabled or all failed
See also
calibrate_model(): Run iterative optimization get_optimization_status(): Check optimization configuration
- calibrate_model()[source]#
Calibrate model(s) using configured optimization algorithm.
Coordinates iterative parameter optimization for one or more hydrological models using the registry-based unified optimizer infrastructure. Handles configuration validation, optimizer instantiation, algorithm selection, and execution.
- Calibration Workflow:
Check if ‘iteration’ in config.optimization.methods - If not, log info and return None (disabled)
Get algorithm from config.optimization.algorithm (default: ‘PSO’) - Supported: DDS, ASYNC_DDS, PSO, DE, SCE-UA, NSGA-II, ADAM, LBFGS
Parse configured hydrological models (config.model.hydrological_model) - Comma-separated list, e.g., ‘SUMMA,FUSE’ - Upper-case normalization
For each model: a. Call _calibrate_with_registry(model, algorithm) b. Collect results c. Log completion
Return last result (for single model) or last of multiple
- Algorithm Selection:
Via config.optimization.algorithm: - DDS, ASYNC-DDS, ASYNCDDS, ASYNC_DDS: Dynamically Dimensioned Search - PSO: Particle Swarm Optimization - SCE-UA: Shuffled Complex Evolution - DE: Differential Evolution - NSGA-II: Multi-objective non-dominated sorting - ADAM: Gradient-based with adaptive moments - LBFGS: Gradient-based quasi-Newton method
- Registry-Based Model Optimization:
Each model uses model-specific optimizer from OptimizerRegistry: - OptimizerRegistry.get_optimizer(‘MODELNAME’) returns optimizer class - Optimizer class inherits from BaseModelOptimizer - Example: SUMMAOptimizer for SUMMA calibration
- Configuration Parameters:
- Workflow Control:
optimization.methods: Must contain ‘iteration’ optimization.algorithm: Algorithm name (PSO, DDS, ADAM, etc.)
- Model Selection:
model.hydrological_model: Comma-separated model names
- Algorithm-Specific (if applicable):
optimization.adam_steps: Number of steps (default: 100) optimization.adam_learning_rate: Learning rate (default: 0.01) optimization.lbfgs_steps: Max steps (default: 50) optimization.lbfgs_learning_rate: Step size (default: 0.1)
- Returns:
Path to last completed calibration results file - None if: disabled, no models configured, or all failed - Path if: at least one model calibration completed successfully - Typically: project_dir/optimization/{model}_{algorithm}_results.csv
- Return type:
Optional[Path]
- Raises:
(Caught internally, returns None instead) –
- Registry lookup failures –
- Optimizer instantiation errors –
- Algorithm execution failures –
- Side Effects:
Creates project_dir/optimization/ directory
Generates model-specific results files
Logs calibration progress and status to logger
Modifies reporting_manager state (if configured)
Examples
>>> # Single model with DDS >>> opt_mgr = OptimizationManager(config, logger) >>> results_path = opt_mgr.calibrate_model() >>> if results_path: ... print(f"Calibration completed: {results_path}")
>>> # Multiple models (SUMMA + FUSE) >>> # config.model.hydrological_model = 'SUMMA,FUSE' >>> results_path = opt_mgr.calibrate_model() # Returns FUSE results
>>> # Disabled calibration >>> # config.optimization.methods = ['forward'] (no 'iteration') >>> results_path = opt_mgr.calibrate_model() >>> assert results_path is None
Notes
Disabled silently returns None (no error)
Registry lookup errors logged and skipped
Execution errors caught and logged (non-fatal)
Multiple models: Last result returned (not aggregated)
See also
_calibrate_with_registry(): Registry-based optimizer execution run_optimization_workflow(): Top-level workflow coordinator OptimizerRegistry: Registry for model-specific optimizers BaseModelOptimizer: Base class for model optimizers
- get_optimization_status()[source]#
Get status of optimization operations.
- Returns:
Dictionary containing optimization status information
- Return type:
Dict[str, Any]
- validate_optimization_configuration()[source]#
Validate optimization configuration settings.
Deprecated since version Use:
validate_readiness()instead. Algorithm and metric validation is now handled by Pydantic validators at config construction time.- Returns:
Dictionary containing validation results
- Return type:
Dict[str, bool]
- validate_readiness()[source]#
Validate that this manager is ready for execution.
Checks runtime prerequisites that Pydantic cannot verify: whether the configured model has a registered optimizer and whether calibration parameters are defined.
- Returns:
Dict mapping check names to pass/fail booleans.
- Return type:
Dict[str, bool]
- get_available_optimizers()[source]#
Get list of available optimization algorithms.
- Returns:
Dictionary mapping algorithm identifiers to their descriptions
- Return type:
Dict[str, str]
- load_optimization_results(filename=None)[source]#
Load optimization results from file.
- Parameters:
filename (str, optional) – Name of results file to load. If None, uses the default filename based on experiment_id.
- Returns:
Dictionary with optimization results. Returns None if loading fails.
- Return type:
Optional[Dict]
Key Methods:
from symfluence.optimization.optimization_manager import OptimizationManager
opt = OptimizationManager(config, logger)
# Run full optimization workflow
results = opt.run_optimization_workflow()
# Or run calibration directly
results_path = opt.calibrate_model()
# Check optimization status
status = opt.get_optimization_status()
# Validate configuration
validation = opt.validate_optimization_configuration()
# Get available optimizers
optimizers = opt.get_available_optimizers()
Analysis Manager#
Performs model evaluation and analysis.
Analysis management for SYMFLUENCE model evaluation workflows.
Coordinates benchmarking, sensitivity analysis, and decision analysis for evaluating hydrological model performance and parameter importance.
- class symfluence.evaluation.analysis_manager.AnalysisManager(config, logger, reporting_manager=None)[source]#
Bases:
ConfigurableMixinOrchestrates comprehensive post-calibration analysis of model performance and sensitivity.
Central coordinator for evaluating hydrological model performance through benchmarking, sensitivity analysis, and decision analysis. Provides unified interface to investigate model behavior, parameter importance, and structural choices. Integrates with evaluation framework to generate publication-ready analysis reports and visualizations.
This class implements the Facade Pattern to manage complex analysis workflows across multiple hydrological models (SUMMA, FUSE, GR, HYPE, etc.). Enables systematic investigation of model strengths/weaknesses and parameter contributions to output uncertainty.
Key Responsibilities:
Benchmarking (run_benchmarking): Compare calibrated model against simple reference models (mean flow, seasonality model, persistence model). Purpose: Quantify value added by sophisticated model vs simplicity.
Sensitivity Analysis (run_sensitivity_analysis): Evaluate parameter importance using Morris screening, Sobol indices, or FAST methods. Purpose: Prioritize parameters for observation/data requirements.
Decision Analysis (run_decision_analysis): Assess impact of model structure choices including alternative process representations, parameter sets, and calibration targets. Purpose: Evaluate trade-offs in model complexity vs parsimony.
Visualization: Generate analysis plots via ReportingManager including performance comparisons, sensitivity indices, parameter rankings, and decision analysis trade-off plots.
Analysis Types:
- Benchmarking:
- Input:
Calibrated model results
Observed streamflow
Reference model outputs
- Process:
Run reference models (mean, seasonality, persistence)
Compute performance metrics (KGE, NSE, RMSE)
Compare against calibrated model
Calculate relative improvement
- Output:
Benchmark comparison table
Performance metrics for all models
Visualization showing performance rank
- Interpretation:
KGE(model) > KGE(mean) ≈ Model outperforms naive reference KGE(model) < KGE(mean) ≈ Model worse than simple average (concerning!) KGE(model) >> KGE(seasonality) ≈ Model captures dynamics beyond seasonal pattern
- Sensitivity Analysis:
- Input:
Parameter ranges (bounds)
Model configuration
Model outputs and observations
- Sampling Methods:
Morris One-At-a-Time: Fast screening (~100s samples)
Sobol Quasi-Random: Variance-based (~1000s samples)
FAST: Spectral approach (~500s samples)
- Output:
Sensitivity indices (μ, σ, μ*, S1, ST, etc.)
Parameter ranking by importance
Grouped influence vs non-influential parameters
- Interpretation:
mu* (modified Morris) is average absolute sensitivity - main effect magnitude. S1 (Sobol 1st order) is fraction of output variance from Xi alone. ST (Sobol total) is fraction of output variance involving Xi. Non-influential parameters can be removed from calibration.
- Decision Analysis:
- Input:
Multiple model configurations
Results from each configuration
Observations for validation
- Comparison Axes:
Model structure (e.g., SUMMA vs FUSE)
Process representation (e.g., 2-layer vs 3-layer soil)
Calibration target (KGE vs NSE vs RMSE)
Spatial discretization (lumped vs distributed)
- Output:
Performance metrics for each configuration
Trade-off analysis plots
Pareto frontier of non-dominated solutions
- Use Case:
Decide between 3-layer soil (more parameters, better fit) vs 2-layer (simpler, more generalizable) by comparing performance
Configuration Parameters:
- analysis.benchmarking.enabled: bool (default False)
Enable benchmarking analysis
- analysis.benchmarking.reference_models: list
Which reference models to include: [‘mean’, ‘seasonality’, ‘persistence’]
- analysis.sensitivity.method: str
Sensitivity method: ‘morris’, ‘sobol’, ‘fast’, ‘delsa’
- analysis.sensitivity.num_samples: int
Number of samples for sensitivity analysis
- analysis.sensitivity.parameters: list
Which parameters to analyze (subset of all model parameters)
- analysis.decision_analysis.configurations: list
List of model configurations to compare
- reporting.analysis_enabled: bool
Generate visualization plots of analysis results
Output Structure:
- analysis_results/
- benchmarking/
benchmark_comparison.csv # Performance metrics table benchmark_plots/ # PNG plots
- sensitivity_analysis/
sensitivity_indices.csv # Sobol indices, Morris screening parameter_ranking.csv # Ranked by importance sensitivity_plots/ # Bar charts, rankings
- decision_analysis/
configuration_comparison.csv # Metrics for each configuration tradeoff_analysis/ # Trade-off plots, Pareto frontier
Example Usage:
>>> config = SymfluenceConfig.from_file('config.yaml') >>> logger = setup_logger() >>> analysis_mgr = AnalysisManager(config, logger) >>> >>> # Run benchmarking >>> benchmark_results = analysis_mgr.run_benchmarking() >>> # Output: Performance comparison with mean, seasonality, persistence models >>> >>> # Run sensitivity analysis >>> sensitivity_results = analysis_mgr.run_sensitivity_analysis() >>> # Output: Parameter importance rankings >>> >>> # Run decision analysis >>> decision_results = analysis_mgr.run_decision_analysis() >>> # Output: Trade-off analysis comparing configurations
Key Methods:
- run_benchmarking() → Path:
Execute benchmarking against reference models Returns path to benchmark results directory
- run_sensitivity_analysis() → Path:
Execute parameter sensitivity analysis Returns path to sensitivity results
- run_decision_analysis() → Path:
Execute model structure decision analysis Returns path to decision analysis results
- run_all_analyses() → Dict[str, Path]:
Execute all configured analyses Returns dict of {analysis_type: results_path}
Performance:
Benchmarking: Minutes to hours (depends on model runtime) Sensitivity Analysis: Hours to days (1000+ model evaluations needed) Decision Analysis: Days+ (multiple full model runs)
Integration Points:
EvaluationRegistry: Access evaluators for metrics
OptimizationManager: Access calibrated parameter sets
ModelManager: Run reference models, configurations
ReportingManager: Generate analysis visualizations
DataManager: Access observations for validation
See also
Benchmarker: Low-level benchmarking implementation
SensitivityAnalyzer: Low-level sensitivity analysis
EvaluationRegistry: Registry of evaluation methods
ReportingManager: Visualization of analysis results
- Parameters:
config (SymfluenceConfig)
logger (Logger)
reporting_manager (Any | None)
- config#
Configuration dictionary
- Type:
Dict[str, Any]
- logger#
Logger instance
- Type:
logging.Logger
- __init__(config, logger, reporting_manager=None)[source]#
Initialize the Analysis Manager.
- Parameters:
config (SymfluenceConfig) – SymfluenceConfig instance
logger (Logger) – Logger instance
reporting_manager (Any | None) – ReportingManager instance
- Raises:
TypeError – If config is not a SymfluenceConfig instance
- run_benchmarking()[source]#
Run benchmarking analysis to evaluate model performance against reference models.
Benchmarking compares the performance of sophisticated hydrological models against simple reference models (e.g., mean flow, seasonality model) to quantify the value added by the model’s complexity. This process includes:
Preprocessing observed data for the benchmark period
Running simple benchmark models (e.g., mean, seasonality, persistence)
Computing performance metrics for each benchmark
Visualizing benchmark results for comparison
Benchmarking provides a baseline for evaluating model performance and helps identify the minimum acceptable performance for a given watershed.
- Returns:
Path to benchmark results file or None if benchmarking failed
- Return type:
Optional[Path]
- Raises:
FileNotFoundError – If required observation data is missing
ValueError – If date ranges are invalid
Exception – For other errors during benchmarking
- run_sensitivity_analysis()[source]#
Run sensitivity analysis to evaluate parameter importance and uncertainty.
Sensitivity analysis quantifies how model parameters influence simulation results and performance metrics. This analysis helps:
Identify which parameters have the most significant impact on model performance
Quantify parameter uncertainty and its effect on predictions
Guide model simplification by identifying insensitive parameters
Inform calibration strategies by focusing on sensitive parameters
The method iterates through configured hydrological models, running model-specific sensitivity analyses where supported. Uses AnalysisRegistry for model-specific analyzers when available, falling back to generic SensitivityAnalyzer for models without custom implementations.
- Returns:
- Dictionary mapping model names to sensitivity results,
or None if the analysis was disabled or failed
- Return type:
Optional[Dict]
- Raises:
FileNotFoundError – If required optimization results are missing
Exception – For other errors during sensitivity analysis
- run_koopman_analysis(ensemble_df, obs_streamflow, train_end='2007-12-31', eval_start='2008-01-01', rank=None, hankel_d=1, svd_threshold=0.99, dmd_method='fbdmd', analyzer_name='DEFAULT')[source]#
Run Koopman operator analysis of a multi-model hydrological ensemble.
Uses EDMD with structurally diverse model outputs as lifting functions to approximate the Koopman operator. The dictionary contains only model outputs (no observed streamflow), ensuring a fair comparison with the ensemble mean. Observed streamflow is predicted via Ridge regression from the Koopman eigenspace.
- Parameters:
ensemble_df (pandas.DataFrame) – (T, N) DataFrame of model outputs (m^3/s), aligned daily
obs_streamflow (pandas.Series) – (T,) Series of observed streamflow (m^3/s)
train_end (str) – Last training date (default “2007-12-31”)
eval_start (str) – First evaluation date (default “2008-01-01”)
rank (int | None) – Explicit DMD rank (None = auto from SVD threshold)
hankel_d (int) – Hankel delay depth for model outputs (1 = no embedding)
svd_threshold (float) – Cumulative energy threshold for rank selection
dmd_method (str) – ‘standard’ or ‘fbdmd’ (default ‘fbdmd’)
analyzer_name (str) – Registry key for the Koopman analyzer (default “DEFAULT”)
- Returns:
Dict with eigenvalues, timescales, metrics, mode_loadings, etc., or None if analysis failed.
- Return type:
Dict | None
- run_decision_analysis()[source]#
Run decision analysis to assess the impact of model structure choices.
Decision analysis evaluates different model structure configurations (e.g., process representations, parameterizations) to understand their impact on model performance.
Uses AnalysisRegistry to discover model-specific decision analyzers. Each model can register its own analyzer that implements the run_full_analysis() interface returning (results_file, best_combinations).
- Returns:
- Dictionary mapping model names to decision analysis results,
or None if the analysis was disabled or failed
- Return type:
Optional[Dict]
- get_analysis_status()[source]#
Get status of various analyses.
This method provides a comprehensive status report on the analysis operations. It checks for the existence of key files and directories to determine which analyses have been completed successfully and which are available to run.
The status information includes: - Whether benchmarking has been completed - Whether sensitivity analysis is available and its results exist - Whether decision analysis is available and its results exist - Whether optimization results (required for some analyses) exist
This information is useful for tracking progress, diagnosing issues, and providing feedback to users.
- Returns:
- Dictionary containing analysis status information,
including flags for completed analyses and available results
- Return type:
Dict[str, Any]
- run_multivariate_evaluation(sim_results)[source]#
Run multivariate evaluation against all available observations.
- Parameters:
sim_results (Dict[str, pandas.Series])
- Return type:
Dict[str, Dict[str, float]]
- validate_analysis_requirements()[source]#
Validate that requirements are met for running analyses.
Deprecated since version Use:
validate_readiness()instead.- Returns:
Dictionary indicating which analyses can be run.
- Return type:
Dict[str, bool]
- validate_readiness()[source]#
Validate that this manager is ready for execution.
Checks runtime prerequisites: processed observations, optimization results (for sensitivity), and simulation outputs (for decision analysis). All analyses require processed observations as a baseline.
- Returns:
Dict mapping check names to pass/fail booleans.
- Return type:
Dict[str, bool]
Key Methods:
from symfluence.evaluation.analysis_manager import AnalysisManager
am = AnalysisManager(config, logger)
# Run benchmarking analysis
am.run_benchmarking()
# Run sensitivity analysis
am.run_sensitivity_analysis()
# Run decision analysis
am.run_decision_analysis()
Workflow Orchestrator#
Manages workflow step execution and dependencies.
Workflow orchestration for SYMFLUENCE hydrological modeling pipeline.
Coordinates the execution sequence of modeling steps including domain definition, data preprocessing, model execution, optimization, and analysis phases.
- class symfluence.project.workflow_orchestrator.WorkflowStep(name, cli_name, func, check_func, description)[source]#
Bases:
ConfigMixinRepresents a single step in the SYMFLUENCE workflow.
- Parameters:
name (str)
cli_name (str)
func (Callable)
check_func (Callable)
description (str)
- name: str#
- cli_name: str#
- func: Callable#
- check_func: Callable#
- description: str#
- class symfluence.project.workflow_orchestrator.WorkflowOrchestrator(managers, config, logger, logging_manager=None, provenance=None)[source]#
Bases:
ConfigMixinOrchestrates the SYMFLUENCE workflow execution and manages the step sequence.
The WorkflowOrchestrator is responsible for defining, coordinating, and executing the complete SYMFLUENCE modeling workflow. It integrates the various manager components into a coherent sequence of operations, handling dependencies between steps, tracking progress, and providing status information.
Key responsibilities: - Defining the sequence of workflow steps and their validation checks - Coordinating execution across different manager components - Handling execution flow (skipping completed steps, stopping on errors) - Providing status information and execution reports - Validating prerequisites before workflow execution
This class represents the “conductor” of the SYMFLUENCE system, ensuring that each component performs its tasks in the correct order and with the necessary inputs from previous steps.
- Parameters:
managers (Dict[str, Any])
config (SymfluenceConfig | Dict[str, Any])
logger (Logger)
- managers#
Dictionary of manager instances
- Type:
Dict[str, Any]
- config#
Typed configuration object
- Type:
- logger#
Logger instance
- Type:
logging.Logger
- domain_name#
Name of the hydrological domain
- Type:
str
- experiment_id#
ID of the current experiment
- Type:
str
- project_dir#
Path to the project directory
- Type:
Path
- logging_manager#
Reference to logging manager for enhanced formatting
- __init__(managers, config, logger, logging_manager=None, provenance=None)[source]#
Initialize the workflow orchestrator.
- Parameters:
managers (Dict[str, Any]) – Dictionary of manager instances for each functional area
config (SymfluenceConfig | Dict[str, Any]) – SymfluenceConfig instance (dicts are auto-converted)
logger (Logger) – Logger instance for recording operations
logging_manager – Reference to LoggingManager for enhanced formatting
provenance – Optional RunProvenance instance for step-level tracking
- Raises:
KeyError – If essential configuration values are missing
- define_workflow_steps()[source]#
Define the workflow steps with their output validation checks and descriptions.
- Returns:
List of WorkflowStep objects
- Return type:
List[WorkflowStep]
- run_workflow(force_run=False)[source]#
Run the complete workflow according to the defined steps.
This method executes each step in the workflow sequence, handling: - Conditional execution based on existing outputs - Error handling with configurable stop-on-error behavior - Progress tracking and timing information - Comprehensive logging of each operation
The workflow can be configured to: - Skip steps that have already been completed (default) - Force re-execution of all steps (force_run=True) - Continue or stop on errors (based on STOP_ON_ERROR config)
- Parameters:
force_run (bool) – If True, forces execution of all steps even if outputs exist. If False (default), skips steps with existing outputs.
- Raises:
Exception – If a step fails and STOP_ON_ERROR is True in configuration
Note
The method provides detailed logging throughout execution, including: - Step headers with progress indicators - Execution timing for each step - Clear success/skip/failure indicators - Final summary statistics
- validate_workflow_prerequisites()[source]#
Validate that all prerequisites are met before running the workflow.
Config-level validation (required keys, types, ranges) is handled by Pydantic at SymfluenceConfig construction time. This method focuses on runtime prerequisites: manager initialization and manager readiness.
- Returns:
True if all prerequisites are met, False otherwise
- Return type:
bool
- run_individual_steps(step_names, continue_on_error=False)[source]#
Execute a specific list of workflow steps by their CLI names.
- Parameters:
step_names (List[str]) – List of step CLI names to execute
continue_on_error (bool) – Whether to continue to next step if one fails
- Returns:
List of dictionaries containing execution results for each step
- Return type:
List[Dict[str, Any]]
- get_workflow_status()[source]#
Get the current status of the workflow execution.
This method examines each step in the workflow to determine whether it has been completed, using the same output validation checks used during execution. It provides a comprehensive view of workflow progress, including which steps are complete and which are pending.
The status information is useful for: - Monitoring long-running workflows - Generating progress reports - Diagnosing execution issues - Providing feedback to users
- Returns:
- Dictionary containing workflow status information, including:
total_steps: Total number of workflow steps
completed_steps: Number of completed steps
pending_steps: Number of pending steps
step_details: List of dictionaries with details for each step (name and completion status)
- Return type:
Dict[str, Any]
Usage:
from symfluence.project.workflow_orchestrator import WorkflowOrchestrator
orchestrator = WorkflowOrchestrator(config, logger, managers)
# Run full workflow
orchestrator.run_workflow()
# Run specific step
orchestrator.run_step("calibrate_model")
# Get workflow status
status = orchestrator.get_workflow_status()
Model Registry#
The Model Registry enables plugin-style model integration.
Central registry for model components (preprocessors, runners, postprocessors).
Facade over ComponentRegistry, ConfigRegistry, and ResultExtractorRegistry. Models self-register via decorators at import time; the workflow layer queries by model name to discover and instantiate components.
- class symfluence.models.registry.ModelRegistry[source]#
Bases:
objectFacade over ComponentRegistry, ConfigRegistry, and ResultExtractorRegistry.
Register components via decorators (
@ModelRegistry.register_runner('SUMMA')) and look them up by model name (ModelRegistry.get_runner('SUMMA')). All delegation methods are one-liner pass-throughs to the sub-registries.- classmethod register_preprocessor(model_name)[source]#
Register a preprocessor class for a model.
- Parameters:
model_name (str)
- Return type:
Callable[[Type], Type]
- classmethod register_runner(model_name, method_name='run')[source]#
Register a runner class for a model.
- Parameters:
model_name (str)
method_name (str)
- Return type:
Callable[[Type], Type]
- classmethod register_postprocessor(model_name)[source]#
Register a postprocessor class for a model.
- Parameters:
model_name (str)
- Return type:
Callable[[Type], Type]
- classmethod register_visualizer(model_name)[source]#
Register a visualization function for a model.
- Parameters:
model_name (str)
- Return type:
Callable[[Callable], Callable]
- classmethod get_preprocessor(model_name)[source]#
Get preprocessor class for a model.
- Parameters:
model_name (str)
- Return type:
Type | None
- classmethod get_runner(model_name)[source]#
Get runner class for a model.
- Parameters:
model_name (str)
- Return type:
Type | None
- classmethod get_postprocessor(model_name)[source]#
Get postprocessor class for a model.
- Parameters:
model_name (str)
- Return type:
Type | None
- classmethod get_visualizer(model_name)[source]#
Get visualizer function for a model.
- Parameters:
model_name (str)
- Return type:
Callable | None
- classmethod get_runner_method(model_name)[source]#
Get the runner method name for a model.
- Parameters:
model_name (str)
- Return type:
str
- classmethod list_models()[source]#
List all models with registered components.
- Return type:
list[str]
- classmethod get_model_components(model_name)[source]#
Get all registered component classes for a model.
- Parameters:
model_name (str)
- Return type:
Dict[str, Any]
- classmethod validate_model_registration(model_name, require_all=False)[source]#
Validate that a model has all required components registered.
- Parameters:
model_name (str)
require_all (bool)
- Return type:
Dict[str, Any]
- classmethod validate_all_models(require_all=False, logger=None)[source]#
Validate registration status of all registered models.
- Parameters:
require_all (bool)
logger (Logger)
- Return type:
Dict[str, Dict[str, Any]]
- classmethod register_config_adapter(model_name)[source]#
Register a complete config adapter for a model.
- Parameters:
model_name (str)
- Return type:
Callable[[Type], Type]
- classmethod register_config_schema(model_name, schema)[source]#
Register Pydantic config schema for a model.
- Parameters:
model_name (str)
schema (Type)
- Return type:
Type
- classmethod register_config_defaults(model_name, defaults)[source]#
Register default configuration values for a model.
- Parameters:
model_name (str)
defaults (Dict[str, Any])
- Return type:
Dict[str, Any]
- classmethod register_config_transformers(model_name, transformers)[source]#
Register flat-to-nested field transformers for a model.
- Parameters:
model_name (str)
transformers (Dict[str, Tuple[str, ...]])
- Return type:
Dict[str, Tuple[str, …]]
- classmethod register_config_validator(model_name, validator)[source]#
Register custom validation function for a model.
- Parameters:
model_name (str)
validator (Callable)
- Return type:
Callable
- classmethod get_config_adapter(model_name)[source]#
Get config adapter instance for a model.
- Parameters:
model_name (str)
- Return type:
Any | None
- classmethod get_config_schema(model_name)[source]#
Get Pydantic config schema for a model.
- Parameters:
model_name (str)
- Return type:
Type | None
- classmethod get_config_defaults(model_name)[source]#
Get default configuration for a model.
- Parameters:
model_name (str)
- Return type:
Dict[str, Any]
- classmethod get_config_transformers(model_name)[source]#
Get flat-to-nested transformers for a model.
- Parameters:
model_name (str)
- Return type:
Dict[str, Tuple[str, …]]
- classmethod get_config_validator(model_name)[source]#
Get config validator function for a model.
- Parameters:
model_name (str)
- Return type:
Callable | None
- classmethod validate_model_config(model_name, config)[source]#
Validate model configuration using registered validator.
- Parameters:
model_name (str)
config (Dict[str, Any])
- Return type:
None
- classmethod register_result_extractor(model_name)[source]#
Register a result extractor for a model.
- Parameters:
model_name (str)
- Return type:
Callable[[Type], Type]
- classmethod get_result_extractor(model_name)[source]#
Get result extractor instance for a model.
- Parameters:
model_name (str)
- Return type:
Any | None
- classmethod has_result_extractor(model_name)[source]#
Check if a model has a registered result extractor.
- Parameters:
model_name (str)
- Return type:
bool
- classmethod list_result_extractors()[source]#
List all models with registered result extractors.
- Return type:
list[str]
- classmethod get_forcing_adapter(model_name, config, logger=None)[source]#
Get forcing adapter instance for a model.
- Parameters:
model_name (str) – Model name
config (Dict) – Configuration dictionary
logger – Optional logger instance
- Returns:
ForcingAdapter instance or None if not registered
- Return type:
Any | None
Registering Models#
from symfluence.models.registry import ModelRegistry
# Register preprocessor
@ModelRegistry.register_preprocessor('MY_MODEL')
class MyPreProcessor:
def __init__(self, config, logger):
self.config = config
self.logger = logger
def run_preprocessing(self):
# Preprocessing logic
pass
# Register runner
@ModelRegistry.register_runner('MY_MODEL', method_name='run_my_model')
class MyRunner:
def __init__(self, config, logger, reporting_manager=None):
self.config = config
self.logger = logger
def run_my_model(self):
# Model execution logic
pass
# Register postprocessor
@ModelRegistry.register_postprocessor('MY_MODEL')
class MyPostProcessor:
def __init__(self, config, logger, reporting_manager=None):
self.config = config
self.logger = logger
def extract_streamflow(self):
# Result extraction logic
pass
Querying Registry#
from symfluence.models.registry import ModelRegistry
# List registered models
models = ModelRegistry.list_models()
# ['SUMMA', 'FUSE', 'GR', 'HYPE', 'NGEN', ...]
# Get specific components
preprocessor_cls = ModelRegistry.get_preprocessor('SUMMA')
runner_cls = ModelRegistry.get_runner('SUMMA')
postprocessor_cls = ModelRegistry.get_postprocessor('SUMMA')
# Check if model is registered
is_registered = ModelRegistry.is_registered('MY_MODEL')
Optimization API#
Base Optimizer#
Base Model Optimizer
Abstract base class providing unified optimization infrastructure for all hydrological models. Implements template method pattern to delegate model-specific operations while centralizing algorithm execution, parallel processing, results tracking, and final evaluation workflows.
- Mixin Components:
ConfigurableMixin, ParallelExecutionMixin, ResultsTrackingMixin, RetryExecutionMixin, GradientOptimizationMixin
- Abstract Methods (Must Implement in Subclass):
_get_model_name() -> str _run_model_for_final_evaluation(output_dir) -> bool _get_final_file_manager_path() -> Path
- Optional Overrides:
_create_parameter_manager(), _create_calibration_target(), _create_worker(), _apply_best_parameters_for_final(), _get_settings_directory()
- class symfluence.optimization.optimizers.base_model_optimizer.BaseModelOptimizer(config, logger, optimization_settings_dir=None, reporting_manager=None)[source]#
Bases:
ConfigurableMixin,ParallelExecutionMixin,ResultsTrackingMixin,RetryExecutionMixin,GradientOptimizationMixin,ABCAbstract base class for model-specific optimizers.
Implements template method pattern providing unified optimization across all hydrological models. Uses mixins for parallel execution, results tracking, retry logic, and gradient-based optimization.
Subclasses must implement: _get_model_name(), _run_model_for_final_evaluation(), _get_final_file_manager_path(). Components (param_manager, worker, calibration_target) are created via overridable factory methods with registry-based defaults.
- Parameters:
config (SymfluenceConfig | Dict[str, Any])
logger (Logger)
optimization_settings_dir (Path | None)
reporting_manager (Any | None)
- DEFAULT_ITERATIONS = 100#
- DEFAULT_POPULATION_SIZE = 30#
- DEFAULT_PENALTY_SCORE = -9999.0#
- __init__(config, logger, optimization_settings_dir=None, reporting_manager=None)[source]#
Initialize the model optimizer.
- Parameters:
config (SymfluenceConfig | Dict[str, Any]) – Configuration (typed SymfluenceConfig or legacy dict)
logger (Logger) – Logger instance
optimization_settings_dir (Path | None) – Optional path to optimization settings
reporting_manager (Any | None) – ReportingManager instance
- property task_builder: TaskBuilder#
Lazy-initialized task builder.
- property population_evaluator: PopulationEvaluator#
Lazy-initialized population evaluator.
- property results_saver: FinalResultsSaver#
Lazy-initialized results saver.
- property final_orchestrator: FinalEvaluationOrchestrator#
Lazy-initialized final evaluation orchestrator.
- log_iteration_progress(algorithm_name, iteration, best_score, secondary_score=None, secondary_label=None, n_improved=None, population_size=None, crash_stats=None)[source]#
Log optimization progress. Delegates to EvaluationMetricsTracker.
- Parameters:
algorithm_name (str)
iteration (int)
best_score (float)
secondary_score (float | None)
secondary_label (str | None)
n_improved (int | None)
population_size (int | None)
crash_stats (Dict[str, Any] | None)
- Return type:
None
- log_initial_population(algorithm_name, population_size, best_score)[source]#
Log initial population completion. Delegates to EvaluationMetricsTracker.
- Parameters:
algorithm_name (str)
population_size (int)
best_score (float)
- Return type:
None
- get_crash_stats()[source]#
Return crash rate statistics. Delegates to EvaluationMetricsTracker.
- Return type:
Dict[str, Any]
- run_optimization(algorithm_name)[source]#
Run optimization using a specified algorithm from the registry.
- Parameters:
algorithm_name (str) – Algorithm name (case-insensitive)
- Returns:
Path to results JSON file
- Return type:
Path
- run_cmaes()[source]#
Run CMA-ES (Covariance Matrix Adaptation Evolution Strategy) optimization.
- Return type:
Path
- run_dream()[source]#
Run DREAM (DiffeRential Evolution Adaptive Metropolis) optimization.
- Return type:
Path
- run_glue()[source]#
Run GLUE (Generalized Likelihood Uncertainty Estimation) analysis.
- Return type:
Path
- run_bayesian_opt()[source]#
Run Bayesian Optimization with Gaussian Process surrogate.
- Return type:
Path
- run_moead()[source]#
Run MOEA/D (Multi-Objective Evolutionary Algorithm based on Decomposition).
- Return type:
Path
- run_abc()[source]#
Run Approximate Bayesian Computation (ABC-SMC) for likelihood-free inference.
- Return type:
Path
- run_adam(steps=100, lr=0.01)[source]#
Run Adam gradient-based optimization.
- Parameters:
steps (int) – Number of optimization steps (passed via config ADAM_STEPS)
lr (float) – Learning rate (passed via config ADAM_LR)
- Returns:
Path to results file
- Return type:
Path
- run_lbfgs(steps=50, lr=0.1)[source]#
Run L-BFGS gradient-based optimization.
- Parameters:
steps (int) – Maximum number of steps (passed via config LBFGS_STEPS)
lr (float) – Initial step size (passed via config LBFGS_LR)
- Returns:
Path to results file
- Return type:
Path
- run_final_evaluation(best_params)[source]#
Run final evaluation with best parameters over full experiment period.
Evaluates the model on both calibration and evaluation windows, then restores settings for reproducibility. Subclasses may override for custom behavior.
- Parameters:
best_params (Dict[str, float]) – Best parameters from optimization (physical units)
- Returns:
Dict with ‘final_metrics’, ‘calibration_metrics’, ‘evaluation_metrics’, ‘success’, ‘best_params’, or None if failed
- Return type:
Dict[str, Any] | None
DDS Algorithm#
Dynamically Dimensioned Search (DDS) is accessed through the BaseModelOptimizer interface.
Model-specific optimizers inherit from BaseModelOptimizer and use the DDS algorithm
via the run_dds() method.
Usage:
# DDS is invoked through model-specific optimizers
from symfluence.optimization import OptimizationManager
opt_manager = OptimizationManager(config, logger)
results = opt_manager.calibrate_model() # Uses algorithm from config
# Or directly via model optimizer
# optimizer.run_dds() # Runs DDS optimization
Algorithm Selection#
# Configure algorithm in YAML
# OPTIMIZATION_ALGORITHM: DDS # or DE, PSO, SCE-UA, NSGA-II
# Programmatic algorithm selection
from symfluence.optimization.optimization_manager import OptimizationManager
opt = OptimizationManager(config, logger)
# Available algorithms
algorithms = ['DDS', 'DE', 'PSO', 'SCE-UA', 'NSGA-II', 'ADAM', 'LBFGS']
Data Acquisition#
Acquisition Service#
Acquisition Service
Unified facade for all data acquisition workflows in SYMFLUENCE. Coordinates downloading and processing of geospatial attributes, forcing data, and observations from diverse sources (cloud, HPC, local). Acts as high-level orchestrator delegating to specialized acquisition handlers and cloud downloaders.
- Architecture:
AcquisitionService provides two parallel acquisition paths:
CLOUD Mode (CloudForcingDownloader): - Cloud-based data providers with direct HTTP/S3 access - DEM sources: Copernicus GLO-30/90, FABDEM, NASADEM, SRTM, ETOPO, Mapzen, ALOS - Soil class: SoilGrids via WCS subsetting - Land cover: MODIS Landcover (multi-year mode), USGS NLCD - Forcing: ERA5 (CDS), CARRA/CERRA (CDS), AORC (AWS/GCS), NEX-GDDP (Zenodo) - Observations: USGS, WSC, SMHI, SNOTEL, GRACE, MODIS snow/ET
MAF Mode (gistoolRunner, datatoolRunner): - HPC-based data access via external MAF tools on supercomputers - gistool: MERIT-Hydro elevation, MODIS landcover, SoilGrids soil class - datatool: ERA5, RDRS, CASR forcing data with Slurm job monitoring - Configuration: Generates MAF JSON configs and executes MAF scheduler - Output: Same directory structure as CLOUD mode
- Data Acquisition Workflows:
Attribute Acquisition (acquire_attributes) - DEM/elevation: Multiple sources with fallback logic - Soil classification: SoilGrids primary, gistool fallback - Land cover: MODIS or USGS depending on availability - Output: GeoTIFF rasters at project_dir/attributes/{type}/
Forcing Data Download (acquire_forcings) - Datasets: ERA5, CARRA, CERRA, AORC, NEX-GDDP - Mode selection: CLOUD vs MAF based on config.domain.data_access - Caching: RawForcingCache with automatic TTL/checksum validation - Unit conversion: Via VariableHandler for dataset-specific mappings - Output: NetCDF at project_dir/forcing/{dataset}_raw/
Observation Data Retrieval (acquire_observations) - Streamflow: USGS (NWIS), WSC (Canada), SMHI (Nordic) - Gridded: GRACE, MODIS Snow, MODIS ET, FLUXNET - Point sensors: SNOTEL (NOAA snow/precip/temp) - Output: CSV at project_dir/observations/{type}/processed/
EM-Earth Supplementary Data (acquire_em_earth_forcings) - Gridded ERA5 re-analysis supplementing point/coarse data - Subsetting: Via bounding box - Averaging: Spatial mean over domain - Output: NetCDF at project_dir/forcing/em_earth_supplementary/
- Configuration Parameters:
- Data Source Selection:
domain.data_access: ‘CLOUD’ or ‘MAF’ (default: ‘MAF’) domain.dem_source: ‘merit_hydro’, ‘copernicus’, ‘copdem90’, ‘fabdem’, ‘nasadem’, ‘srtm’, ‘etopo’, ‘mapzen’, ‘alos’ domain.land_class_source: ‘modis’, ‘usgs_nlcd’ (cloud only) domain.bounding_box_coords: ‘lat_min/lon_min/lat_max/lon_max’
- Download Flags:
domain.download_dem: Enable DEM acquisition (default: True) domain.download_soil: Enable soil class acquisition (default: True) domain.download_landcover: Enable land cover acquisition (default: True)
- Observation Sources:
optimization.observation_variables: List of variables to download evaluation.targets: Evaluation targets (e.g., ‘streamflow’)
- MAF Configuration:
domain.hpc_account: HPC account for job submission domain.hpc_cache_dir: HPC cache directory domain.hpc_job_timeout: Max seconds to wait for jobs
- Caching and Error Handling:
Raw Forcing Cache: - RawForcingCache manages downloaded forcing files - TTL: Files cached for configurable duration (default: 30 days) - Validation: Checksum-based integrity checking - Fallback: Automatic re-download if cache corrupted
Error Recovery: - Network failures: Retry with exponential backoff - Partial downloads: Cleanup and retry - Missing data: Warn and continue with available sources - Configuration errors: Validate early and report clearly
Examples
>>> # Create service and run all acquisitions
>>> from symfluence.data.acquisition.acquisition_service import AcquisitionService
>>> acq = AcquisitionService(config, logger, reporting_manager=reporter)
>>> acq.acquire_attributes()
>>> acq.acquire_forcings()
>>> acq.acquire_observations()
>>> acq.acquire_em_earth_forcings()
>>> # Cloud-only mode (faster for small domains)
>>> # Set config.domain.data_access = 'CLOUD'
>>> acq.acquire_attributes()
>>> # MAF mode (for large domains on HPC)
>>> # Set config.domain.data_access = 'MAF'
>>> acq.acquire_attributes()
References
MERIT-Hydro: Yamazaki et al. (2019) Global Hydrology, Earth System Science
Copernicus DEM: https://copernicus-dem-30m.s3.amazonaws.com/
FABDEM: Hawker et al. (2022) Scientific Data
SoilGrids: Poggio et al. (2021) Scientific Data
MODIS: Justice et al. (2002) Remote Sensing Reviews
- class symfluence.data.acquisition.acquisition_service.AcquisitionService(config, logger, reporting_manager=None)[source]#
Bases:
ConfigurableMixinUnified data acquisition service for all SYMFLUENCE data needs.
High-level facade orchestrating geospatial attributes, forcing data, and observation data acquisition from multiple sources (cloud, HPC, local). Provides flexible acquisition modes (CLOUD vs MAF) and handles caching, error recovery, and visualization.
- Acquisition Modes:
CLOUD Mode: - Direct HTTP/S3 access to cloud providers - Faster for small domains, requires internet access - DEM sources: Copernicus GLO-30/90, FABDEM, NASADEM, SRTM, ETOPO, Mapzen, ALOS - Forcing: ERA5 (CDS), CARRA/CERRA, AORC, NEX-GDDP - Suitable for research, testing, small basins
MAF Mode: - HPC-based via external MAF tools (gistool, datatool) - Better for large domains, requires HPC access - Same output format as CLOUD mode - Handles job queuing and monitoring via Slurm - Suitable for operational, large-scale applications
- Data Acquisition Methods:
acquire_attributes(): Geospatial attributes (DEM, soil, landcover) acquire_forcings(): Meteorological forcing data (ERA5, CARRA, etc.) acquire_observations(): Validation data (streamflow, GRACE, SNOTEL, etc.) acquire_em_earth_forcings(): Supplementary forcing from EM-Earth
- Key Features:
Multi-source geospatial data with automatic fallbacks
Caching with TTL and checksum-based validation
Parallel downloading where supported
Progress visualization via reporting_manager
Comprehensive error handling and logging
Configuration-driven mode selection
- Parameters:
config (SymfluenceConfig | Dict[str, Any])
logger (Logger)
reporting_manager (Any)
- config#
Typed SymfluenceConfig instance
- logger#
Logger for acquisition progress tracking
- data_dir#
Root data directory (from config.system.data_dir)
- domain_name#
Domain identifier (from config.domain.name)
- project_dir#
Project-specific directory (data_dir/domain_{domain_name})
- reporting_manager#
Optional visualization manager
- variable_handler#
VariableHandler for dataset-specific unit conversion
- Configuration:
domain.data_access: ‘CLOUD’ or ‘MAF’ (default: ‘MAF’) domain.dem_source: DEM provider (‘merit_hydro’, ‘copernicus’, ‘copdem90’, ‘fabdem’, ‘nasadem’, ‘srtm’, ‘etopo’, ‘mapzen’, ‘alos’) domain.land_class_source: Land cover provider (‘modis’, ‘usgs_nlcd’) domain.download_dem: Enable DEM acquisition (default: True) domain.download_soil: Enable soil class (default: True) domain.download_landcover: Enable land cover (default: True)
Examples
>>> # Create service with config and logger >>> acq = AcquisitionService(config, logger, reporting_manager=reporter)
>>> # Run complete acquisition workflow >>> acq.acquire_attributes() # DEM, soil, landcover >>> acq.acquire_forcings() # ERA5, CARRA, etc. >>> acq.acquire_observations() # Streamflow, GRACE, etc. >>> acq.acquire_em_earth_forcings() # Supplementary data
>>> # Cloud-only mode (small domain) >>> config.domain.data_access = 'CLOUD' >>> acq.acquire_attributes()
>>> # MAF mode (large domain on HPC) >>> config.domain.data_access = 'MAF' >>> acq.acquire_forcings()
See also
CloudForcingDownloader: Cloud-based data source handlers gistoolRunner: HPC geospatial data extraction datatoolRunner: HPC forcing data extraction RawForcingCache: Forcing data caching system
- acquire_attributes()[source]#
Acquire geospatial attributes including DEM, soil, and land cover data.
Available Data Sources:
# Forcing datasets
forcing_sources = [
'ERA5', # ECMWF reanalysis
'ERA5-Land', # High-resolution land reanalysis
'RDRS', # Regional Deterministic Reforecast System
'CARRA', # Copernicus Arctic Regional Reanalysis
'AORC', # Analysis of Record for Calibration
'CONUS404', # CONUS 404 dataset
'HRRR', # High-Resolution Rapid Refresh
'EM-Earth', # EM-Earth reanalysis
'NEX-GDDP', # NASA climate projections
]
# Observation datasets
obs_sources = [
'USGS', # US Geological Survey streamflow
'WSC', # Water Survey of Canada
'GRDC', # Global Runoff Data Centre
'MODIS', # Remote sensing products
'GRACE', # Gravity recovery data
]
Acquisition Handlers#
from symfluence.data.acquisition import AcquisitionRegistry
# Get available handlers
handlers = AcquisitionRegistry.list_handlers()
# Get specific handler
era5_handler = AcquisitionRegistry.get_handler('ERA5')
Geospatial Operations#
Domain Discretization#
Domain discretization core module for Hydrologic Response Unit (HRU) creation.
Provides the DomainDiscretizer class for subdividing catchments into HRUs based on elevation bands, soil classes, land cover, aspect, or radiation.
- class symfluence.geospatial.discretization.core.DomainDiscretizer(config, logger)[source]#
Bases:
PathResolverMixinA class for discretizing a domain into Hydrologic Response Units (HRUs).
This class provides methods for various types of domain discretization, including elevation-based, soil class-based, land class-based, and radiation-based discretization. HRUs are allowed to be MultiPolygons, meaning spatially disconnected areas with the same attributes are grouped into single HRUs.
- config#
Configuration dictionary.
- Type:
Dict[str, Any]
- logger#
Logger object for logging information and errors.
- root_path#
Root path for the project.
- Type:
Path
- domain_name#
Name of the domain being processed.
- Type:
str
- project_dir#
Directory for the current project.
- Type:
Path
- sort_catchment_shape()[source]#
Sort the catchment shapefile based on GRU and HRU IDs.
This method performs the following steps: 1. Loads the catchment shapefile 2. Sorts the shapefile based on GRU and HRU IDs 3. Saves the sorted shapefile back to the original location
The method uses GRU and HRU ID column names specified in the configuration.
- Raises:
FileNotFoundError – If the catchment shapefile is not found.
ValueError – If the required ID columns are not present in the shapefile.
- discretize_domain()[source]#
Discretize domain into Hydrologic Response Units (HRUs).
Creates HRUs by subdividing the catchment based on specified attributes. Supports multiple discretization methods that can be combined:
- Single-Attribute Methods:
‘lumped’: Single HRU for entire catchment
‘elevation’: HRUs based on elevation bands
‘landclass’: HRUs based on land cover classes
‘soilclass’: HRUs based on soil type classes
‘aspect’: HRUs based on aspect classes
‘radiation’: HRUs based on potential radiation
- Multi-Attribute Methods:
‘elevation,landclass’: Combination of elevation and land cover
‘elevation,soilclass’: Combination of elevation and soil type
Any comma-separated combination of attributes
- Process:
Check for existing custom catchment shapefile
Load or create base catchment geometry
Apply discretization method(s)
Generate GRU (Grouped Response Unit) and HRU IDs
Calculate HRU statistics and attributes
Save shapefile with discretization results
- Returns:
Path to generated HRU shapefile, or None if using existing shapefile
- Raises:
ValueError – If discretization method not recognized
FileNotFoundError – If required input files (DEM, land cover, etc.) not found
Exception – If discretization process fails
- Return type:
Path | None
Note
HRUs can be MultiPolygons (spatially disconnected areas with same attributes)
Minimum HRU size controlled by MIN_HRU_SIZE config parameter
Output shapefile includes: geometry, GRU_ID, HRU_ID, area, and attribute values
Files saved to: {project_dir}/shapefiles/catchment/{domain_name}_HRUs_{method}.shp
Example
For SUB_GRID_DISCRETIZATION=”elevation,landclass”, creates HRUs by: 1. Dividing catchment into elevation bands 2. Within each elevation band, subdividing by land cover class 3. Merging small HRUs below MIN_HRU_SIZE threshold
- class symfluence.geospatial.discretization.core.DomainDiscretizationRunner(config, logger)[source]#
Bases:
objectWraps domain discretization with explicit artifact tracking.
- Parameters:
config (Dict[str, Any])
logger (Any)
Discretization Methods:
# Available discretization approaches
methods = [
'lumped', # Single unit
'GRUs', # Grouped Response Units
'elevation', # Elevation bands
'radiation', # Radiation-based
'combined', # Multiple criteria
]
Evaluation#
Evaluators#
Base Model Evaluator
This module provides the abstract base class for different evaluation variables.
- class symfluence.evaluation.evaluators.base.ModelEvaluator(config, project_dir=None, logger=None)[source]#
Bases:
ConfigurableMixin,ABCAbstract base class for hydrological model evaluation.
Provides standardized infrastructure for comparing simulated and observed data across different hydrological variables (streamflow, snow, ET, etc.). Handles time series alignment, period-based evaluation (calibration/validation), and multi-metric calculation using the centralized metrics module.
- Subclasses must implement:
get_simulation_files(): Locate model output files
extract_simulated_data(): Parse simulation results
get_observed_data_path(): Locate observation files
needs_routing(): Whether mizuRoute output is required
_get_observed_data_column(): Identify data column in obs files
- Parameters:
config (SymfluenceConfig)
project_dir (Path | None)
logger (Logger | None)
- config#
SymfluenceConfig instance with typed access
- calibration_period#
Tuple of (start, end) timestamps for calibration
- evaluation_period#
Tuple of (start, end) timestamps for validation
- eval_timestep#
Target timestep for comparison (‘native’, ‘hourly’, ‘daily’)
- property variable_type: str#
Return the variable type for resampling behavior.
Override in subclasses for flux variables (precipitation, ET) that should use sum aggregation instead of mean.
- Returns:
‘state’ (default) for state variables - use mean aggregation ‘flux’ for flux/accumulation variables - use sum aggregation
- evaluate(sim, obs=None, mizuroute_dir=None, calibration_only=True)[source]#
Alias for calculate_metrics for consistency with other parts of the system
- Parameters:
sim (Any)
obs (pandas.Series | None)
mizuroute_dir (Path | None)
calibration_only (bool)
- Return type:
Dict[str, float] | None
- calculate_metrics(sim, obs=None, mizuroute_dir=None, calibration_only=True)[source]#
Calculate performance metrics for this target.
- Parameters:
sim (Any) – Either a Path to simulation directory or a pre-loaded pd.Series
obs (pandas.Series | None) – Optional pre-loaded pd.Series of observations. If None, loads from file.
mizuroute_dir (Path | None) – mizuRoute simulation directory (if needed and sim is Path)
calibration_only (bool) – If True, only calculate calibration period metrics
- Return type:
Dict[str, float] | None
- abstractmethod get_simulation_files(sim_dir)[source]#
Get relevant simulation output files for this target
- Parameters:
sim_dir (Path)
- Return type:
List[Path]
- abstractmethod extract_simulated_data(sim_files, **kwargs)[source]#
Extract simulated data from output files
- Parameters:
sim_files (List[Path])
- Return type:
pandas.Series
Available Evaluators:
from symfluence.evaluation.evaluators import (
StreamflowEvaluator,
ETEvaluator,
SnowEvaluator,
SoilMoistureEvaluator,
GroundwaterEvaluator,
TWSEvaluator,
)
# Initialize evaluator
evaluator = StreamflowEvaluator(config, project_dir, logger)
# Evaluate simulation
metrics = evaluator.evaluate(sim_dir)
# Returns: {'KGE': 0.85, 'NSE': 0.82, 'RMSE': 12.5, ...}
Metrics#
# Available metrics
metrics = [
'KGE', # Kling-Gupta Efficiency
'KGEnp', # Non-parametric KGE
'NSE', # Nash-Sutcliffe Efficiency
'RMSE', # Root Mean Square Error
'MAE', # Mean Absolute Error
'PBIAS', # Percent Bias
'R2', # Coefficient of Determination
]
Reporting#
Reporting Manager#
Central reporting facade for coordinating all SYMFLUENCE visualizations.
Provides a unified interface for generating publication-ready visualizations across all modeling stages: domain setup, calibration, evaluation, and multi-model comparison. Implements the Facade pattern to orchestrate specialized plotters while hiding complexity from client code.
Heavy lifting is delegated to three orchestrators:
- ModelOutputOrchestrator: registry-based model output dispatch
- CalibrationOrchestrator: post-calibration target dispatch and comparison plots
- DiagnosticsOrchestrator: per-workflow-step diagnostic validation plots
- class symfluence.reporting.reporting_manager.ReportingManager(config, logger, visualize=False, diagnostic=False)[source]#
Bases:
ConfigMixinCentral facade coordinating all visualization and reporting in SYMFLUENCE.
Orchestrates diverse visualization workflows by delegating to specialized plotters for domain maps, calibration analysis, performance benchmarking, and diagnostics. Uses Facade and Lazy Initialization patterns.
Example
>>> rm = ReportingManager(config, logger, visualize=True) >>> rm.plot_domain() # Generate domain overview map >>> rm.plot_calibration() # Plot calibration convergence
- Parameters:
config (SymfluenceConfig)
logger (Any)
visualize (bool)
diagnostic (bool)
- __init__(config, logger, visualize=False, diagnostic=False)[source]#
Initialize the ReportingManager.
- Parameters:
config (SymfluenceConfig) – SymfluenceConfig instance.
logger (Any) – Logger instance.
visualize (bool) – Boolean flag indicating if visualization is enabled.
diagnostic (bool) – Boolean flag indicating if diagnostic mode is enabled.
- property plot_config: PlotConfig#
Lazy initialization of plot configuration.
- property data_processor: DataProcessor#
Lazy initialization of data processor.
- property spatial_processor: SpatialProcessor#
Lazy initialization of spatial processor.
- property domain_plotter: DomainPlotter#
Lazy initialization of domain plotter.
- property optimization_plotter: OptimizationPlotter#
Lazy initialization of optimization plotter.
- property analysis_plotter: AnalysisPlotter#
Lazy initialization of analysis plotter.
- property benchmark_plotter: BenchmarkPlotter#
Lazy initialization of benchmark plotter.
- property snow_plotter: SnowPlotter#
Lazy initialization of snow plotter.
- property diagnostic_plotter: DiagnosticPlotter#
Lazy initialization of diagnostic plotter.
- property model_comparison_plotter: ModelComparisonPlotter#
Lazy initialization of model comparison plotter.
- property forcing_comparison_plotter: ForcingComparisonPlotter#
Lazy initialization of forcing comparison plotter.
- property workflow_diagnostic_plotter: WorkflowDiagnosticPlotter#
Lazy initialization of workflow diagnostic plotter.
- visualize_data_distribution(data, variable_name, stage)[source]#
Visualize data distribution (histogram/boxplot).
- Parameters:
data (Any)
variable_name (str)
stage (str)
- Return type:
None
- visualize_spatial_coverage(raster_path, variable_name, stage)[source]#
Visualize spatial coverage of raster data.
- Parameters:
raster_path (Path)
variable_name (str)
stage (str)
- Return type:
None
- visualize_forcing_comparison(raw_forcing_file, remapped_forcing_file, forcing_grid_shp, hru_shp, variable='precipitation_flux', time_index=0)[source]#
Visualize raw vs. remapped forcing data comparison.
- Parameters:
raw_forcing_file (Path)
remapped_forcing_file (Path)
forcing_grid_shp (Path)
hru_shp (Path)
variable (str)
time_index (int)
- Return type:
str | None
- update_sim_reach_id(config_path=None)[source]#
Update the SIM_REACH_ID in both the config object and YAML file.
- Parameters:
config_path (str | None)
- Return type:
int | None
- visualize_discretized_domain(discretization_method)[source]#
Visualize the discretized domain (HRUs/GRUs).
- Parameters:
discretization_method (str)
- Return type:
str | None
- visualize_model_outputs(model_outputs, obs_files)[source]#
Visualize model outputs (streamflow comparison).
- Parameters:
model_outputs (List[Tuple[str, str]])
obs_files (List[Tuple[str, str]])
- Return type:
str | None
- visualize_lumped_model_outputs(model_outputs, obs_files)[source]#
Visualize lumped model outputs.
- Parameters:
model_outputs (List[Tuple[str, str]])
obs_files (List[Tuple[str, str]])
- Return type:
str | None
- visualize_fuse_outputs(model_outputs, obs_files)[source]#
Visualize FUSE model outputs.
- Parameters:
model_outputs (List[Tuple[str, str]])
obs_files (List[Tuple[str, str]])
- Return type:
str | None
- visualize_summa_outputs(experiment_id)[source]#
Visualize SUMMA model outputs (all variables).
- Parameters:
experiment_id (str)
- Return type:
Dict[str, str]
- visualize_ngen_results(sim_df, obs_df, experiment_id, results_dir)[source]#
Visualize NGen streamflow plots.
- Parameters:
sim_df (Any)
obs_df (Any | None)
experiment_id (str)
results_dir (Path)
- Return type:
None
- visualize_lstm_results(results_df, obs_streamflow, obs_snow, use_snow, output_dir, experiment_id)[source]#
Visualize LSTM simulation results.
- Parameters:
results_df (Any)
obs_streamflow (Any)
obs_snow (Any)
use_snow (bool)
output_dir (Path)
experiment_id (str)
- Return type:
None
- visualize_hype_results(sim_flow, obs_flow, outlet_id, domain_name, experiment_id, project_dir)[source]#
Visualize HYPE streamflow comparison.
- Parameters:
sim_flow (Any)
obs_flow (Any)
outlet_id (str)
domain_name (str)
experiment_id (str)
project_dir (Path)
- Return type:
None
- visualize_model_results(model_name, **kwargs)[source]#
Visualize model results using registry-based dispatch.
- Parameters:
model_name (str)
- Return type:
Any | None
- visualize_timeseries_results()[source]#
Visualize timeseries results from the standard results file.
- Return type:
None
- visualize_benchmarks(benchmark_results)[source]#
Visualize benchmark results.
- Parameters:
benchmark_results (Dict[str, Any])
- Return type:
List[str]
- visualize_snow_comparison(model_outputs)[source]#
Visualize snow comparison.
- Parameters:
model_outputs (List[List[str]])
- Return type:
Dict[str, Any]
- visualize_optimization_progress(history, output_dir, calibration_variable, metric)[source]#
Visualize optimization progress.
- Parameters:
history (List[Dict])
output_dir (Path)
calibration_variable (str)
metric (str)
- Return type:
None
- visualize_optimization_depth_parameters(history, output_dir)[source]#
Visualize depth parameter evolution.
- Parameters:
history (List[Dict])
output_dir (Path)
- Return type:
None
- visualize_sensitivity_analysis(sensitivity_data, output_file, plot_type='single')[source]#
Visualize sensitivity analysis results.
- Parameters:
sensitivity_data (Any)
output_file (Path)
plot_type (str)
- Return type:
None
- visualize_decision_impacts(results_file, output_folder)[source]#
Visualize decision analysis impacts.
- Parameters:
results_file (Path)
output_folder (Path)
- Return type:
None
- visualize_hydrographs_with_highlight(results_file, simulation_results, observed_streamflow, decision_options, output_folder, metric='kge')[source]#
Visualize hydrographs with top performers highlighted.
- Parameters:
results_file (Path)
simulation_results (Dict)
observed_streamflow (Any)
decision_options (Dict)
output_folder (Path)
metric (str)
- Return type:
None
- visualize_drop_analysis(drop_data, optimal_threshold, project_dir)[source]#
Visualize drop analysis for stream threshold selection.
- Parameters:
drop_data (List[Dict])
optimal_threshold (float)
project_dir (Path)
- Return type:
None
- generate_model_comparison_overview(experiment_id=None, context='run_model')[source]#
Generate model comparison overview for all models with valid output.
- Parameters:
experiment_id (str | None)
context (str)
- Return type:
str | None
- visualize_calibration_results(experiment_id=None, calibration_target=None)[source]#
Generate comprehensive post-calibration visualizations.
- Parameters:
experiment_id (str | None)
calibration_target (str | None)
- Return type:
Dict[str, str]
- diagnostic_domain_definition(basin_gdf, dem_path=None)[source]#
Generate diagnostic plots for domain definition step.
- Parameters:
basin_gdf (Any)
dem_path (Path | None)
- Return type:
str | None
- diagnostic_discretization(hru_gdf, method)[source]#
Generate diagnostic plots for discretization step.
- Parameters:
hru_gdf (Any)
method (str)
- Return type:
str | None
- diagnostic_observations(obs_df, obs_type)[source]#
Generate diagnostic plots for observation processing step.
- Parameters:
obs_df (Any)
obs_type (str)
- Return type:
str | None
- diagnostic_forcing_raw(forcing_nc, domain_shp=None)[source]#
Generate diagnostic plots for raw forcing acquisition step.
- Parameters:
forcing_nc (Path)
domain_shp (Path | None)
- Return type:
str | None
- diagnostic_forcing_remapped(raw_nc, remapped_nc, hru_shp=None)[source]#
Generate diagnostic plots for forcing remapping step.
- Parameters:
raw_nc (Path)
remapped_nc (Path)
hru_shp (Path | None)
- Return type:
str | None
- diagnostic_model_preprocessing(input_dir, model_name)[source]#
Generate diagnostic plots for model preprocessing step.
- Parameters:
input_dir (Path)
model_name (str)
- Return type:
str | None
- diagnostic_model_output(output_nc, model_name)[source]#
Generate diagnostic plots for model output step.
- Parameters:
output_nc (Path)
model_name (str)
- Return type:
str | None
- diagnostic_attributes(dem_path=None, soil_path=None, land_path=None)[source]#
Generate diagnostic plots for attribute acquisition step.
- Parameters:
dem_path (Path | None)
soil_path (Path | None)
land_path (Path | None)
- Return type:
str | None
- diagnostic_calibration(history=None, best_params=None, obs_vs_sim=None, model_name='Unknown')[source]#
Generate diagnostic plots for calibration step.
- Parameters:
history (List[Dict] | None)
best_params (Dict[str, float] | None)
obs_vs_sim (Dict[str, Any] | None)
model_name (str)
- Return type:
str | None
Visualization Methods:
from symfluence.reporting.reporting_manager import ReportingManager
rm = ReportingManager(config, logger)
# Generate domain map
rm.plot_domain_map()
# Generate hydrograph
rm.plot_hydrograph(observed, simulated)
# Generate calibration convergence plot
rm.plot_calibration_convergence(results)
# Generate sensitivity analysis plot
rm.visualize_sensitivity_analysis(sensitivity_results)
Configuration#
SymfluenceConfig#
Configuration management facade for SYMFLUENCE.
Provides high-level access to configuration loading, normalization, validation, and type-safe configuration models. Acts as the public API for external code and CLI commands that need to work with SYMFLUENCE configurations.
- Exports:
SymfluenceConfig: Type-safe configuration model with hierarchical access ensure_typed_config: Adapter to convert dict configs to SymfluenceConfig normalize_config: Normalize and apply aliases to configuration dictionaries validate_config: Validate configuration against schema
- class symfluence.core.config.SymfluenceConfig(*args, **kwargs)[source]#
Bases:
BaseModelHierarchical root configuration model for SYMFLUENCE.
Organizes 346+ configuration parameters into logical nested sections: - system: System settings (paths, logging, MPI) - domain: Domain definition (timing, spatial extent, discretization) - forcing: Meteorological forcing data - model: Hydrological model configurations - optimization: Calibration and optimization settings - evaluation: Evaluation data and analysis - paths: File paths and directories
Features: - Type-safe hierarchical access: config.domain.name vs config[‘DOMAIN_NAME’] - Factory methods: from_preset(), from_minimal(), from_file() - Backward compatibility: to_dict(), get(), __getitem__() - Immutable after creation (frozen=True) to prevent mutation bugs - All validation logic preserved from original flat model
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- validate_time_periods()#
Validate that time periods make logical sense
- validate_coordinates()#
Validate coordinate formats and bounds
- validate_model_requirements()#
Validate model-specific required fields based on HYDROLOGICAL_MODEL.
Delegates to ModelRegistry for all model-specific validation.
- validate_spatial_mode_consistency()#
Validate and auto-align spatial modes with domain definition
- validate_optimization_configuration()#
Validate optimization algorithm and parameter settings
- to_dict(flatten=True)[source]#
Convert configuration to dictionary.
- Parameters:
flatten (bool) – If True, returns flat dict with uppercase keys (legacy format) If False, returns nested dict structure
- Returns:
Configuration as dictionary
- Return type:
Dict[str, Any]
Example
>>> config = SymfluenceConfig.from_preset('fuse-basic') >>> flat_dict = config.to_dict(flatten=True) >>> flat_dict['DOMAIN_NAME'] 'my_basin'
- get(key, default=None)[source]#
Dict-like get method for backward compatibility.
Supports both flat keys (‘DOMAIN_NAME’) and dotted paths (‘domain.name’).
- Parameters:
key (str) – Configuration key (uppercase) or dotted path
default (Any) – Default value if key not found
- Returns:
Configuration value or default
- Return type:
Any
Example
>>> config.get('DOMAIN_NAME') 'my_basin' >>> config.get('NONEXISTENT', 'fallback') 'fallback'
- __getitem__(key)[source]#
Dict-like bracket access for backward compatibility.
- Parameters:
key (str) – Configuration key (uppercase)
- Returns:
Configuration value
- Raises:
KeyError – If key not found
- Return type:
Any
Example
>>> config['DOMAIN_NAME'] 'my_basin'
- __contains__(key)[source]#
Check if key exists in configuration.
- Parameters:
key (str)
- Return type:
bool
- __getattr__(name)[source]#
Provide attribute-style access for legacy flat keys.
- Parameters:
name (str)
- Return type:
Any
- classmethod from_file(path, overrides=None, *, use_env=True, validate=True)[source]#
Load configuration from YAML file with full 5-layer hierarchy.
Loading precedence (highest to lowest): 1. CLI overrides (programmatic) 2. Environment variables (SYMFLUENCE_*) 3. Config file (YAML) 4. Defaults from nested models
- Parameters:
path (Path) – Path to configuration YAML file
overrides (Dict[str, Any] | None) – Dictionary of CLI/programmatic overrides
use_env (bool) – Whether to load environment variables (default: True)
validate (bool) – Whether to validate using Pydantic (default: True)
- Returns:
Validated SymfluenceConfig instance
- Raises:
ConfigurationError – If configuration is invalid
FileNotFoundError – If config file is missing
- Return type:
Example
>>> config = SymfluenceConfig.from_file( ... 'config.yaml', ... overrides={'DEBUG_MODE': True} ... )
- classmethod from_preset(preset_name, **overrides)[source]#
Create configuration from a named preset.
- Parameters:
preset_name (str) – Name of preset (‘fuse-provo’, ‘summa-basic’, etc.)
**overrides – Additional overrides to apply on top of preset
- Returns:
Fully validated SymfluenceConfig instance
- Return type:
Example
>>> config = SymfluenceConfig.from_preset( ... 'fuse-provo', ... DOMAIN_NAME='my_basin', ... EXPERIMENT_TIME_START='2020-01-01 00:00' ... )
- classmethod from_minimal(domain_name, model, forcing_dataset='ERA5', **overrides)[source]#
Create minimal viable configuration for quick setup.
Automatically applies sensible defaults based on model choice.
- Parameters:
domain_name (str) – Name for the domain/basin
model (str) – Hydrological model (‘SUMMA’, ‘FUSE’, ‘GR’, etc.)
forcing_dataset (str) – Forcing data source (default: ‘ERA5’)
**overrides – Additional configuration overrides
- Returns:
Validated SymfluenceConfig with minimal required fields
- Return type:
Example
>>> config = SymfluenceConfig.from_minimal( ... domain_name='test_basin', ... model='SUMMA', ... POUR_POINT_COORDS='51.17/-115.57', ... EXPERIMENT_TIME_START='2020-01-01 00:00', ... EXPERIMENT_TIME_END='2020-12-31 23:00' ... )
- symfluence.core.config.ensure_config(config)[source]#
Convert dict to SymfluenceConfig if needed.
This function centralizes the config coercion pattern that was previously duplicated across 60+ files. It uses a runtime import to avoid circular dependency issues while providing type safety via TYPE_CHECKING.
- Parameters:
config (Dict[str, Any] | SymfluenceConfig) – Configuration as dict or SymfluenceConfig instance
- Returns:
SymfluenceConfig instance
- Raises:
TypeError – If config is neither a dict nor SymfluenceConfig
- Return type:
Example
>>> from symfluence.core.config import ensure_config >>> cfg = ensure_config({'DOMAIN_NAME': 'test_domain', ...}) >>> isinstance(cfg, SymfluenceConfig) True
- symfluence.core.config.coerce_config(config, strict=None, warn=True)[source]#
Convert dict to SymfluenceConfig if possible, with configurable fallback.
Similar to ensure_config but can return the original dict if conversion fails (e.g., for partial configs in tests). This maintains backward compatibility with code that may pass incomplete configuration dictionaries.
- Parameters:
config (Dict[str, Any] | SymfluenceConfig) – Configuration as dict or SymfluenceConfig instance
strict (bool) – If True, raise error instead of falling back to dict. If None, uses SYMFLUENCE_STRICT_CONFIG environment variable.
warn (bool) – If True (default), emit DeprecationWarning when falling back to dict. Set to False for tests or cases where dict fallback is intentional.
- Returns:
SymfluenceConfig if conversion succeeds, original dict otherwise (unless strict=True, which raises on failure)
- Raises:
TypeError – If strict=True and config cannot be converted
ValueError – If strict=True and config validation fails
- Return type:
SymfluenceConfig | Dict[str, Any]
Example
>>> # Full config converts to SymfluenceConfig >>> cfg = coerce_config({'DOMAIN_NAME': 'test', ...}) >>> >>> # Partial config in tests falls back to dict (with warning) >>> partial = coerce_config({'some_key': 'value'}) >>> isinstance(partial, dict) True >>> >>> # Strict mode raises instead of falling back >>> coerce_config({'invalid': 'config'}, strict=True) # Raises!
Note
The fallback behavior is deprecated and will be removed in a future version. Use ensure_config() for strict conversion, or pass warn=False if you intentionally need dict fallback (e.g., in tests).
- symfluence.core.config.ensure_typed_config(config)[source]#
Ensure configuration is a SymfluenceConfig instance.
This adapter function converts dict configs to SymfluenceConfig if needed. Use this when interfacing with external code that may pass dict configs.
- Parameters:
config (Dict[str, Any] | SymfluenceConfig) – Configuration as dict or SymfluenceConfig
- Returns:
SymfluenceConfig instance
- Return type:
Example
>>> config = ensure_typed_config({'DOMAIN_NAME': 'test', ...}) >>> isinstance(config, SymfluenceConfig) True
- symfluence.core.config.normalize_config(config)[source]#
Normalize configuration keys using aliases and perform type coercion.
- Parameters:
config (Dict[str, Any]) – Dictionary of configuration settings
- Returns:
New dictionary with normalized keys and coerced values
- Return type:
Dict[str, Any]
- symfluence.core.config.validate_config(config)[source]#
Validate configuration using Pydantic model.
- Parameters:
config (Dict[str, Any]) – Dictionary of configuration settings
- Returns:
Validated configuration dictionary
- Raises:
ValueError – If configuration is invalid
- Return type:
Dict[str, Any]
Loading and Using Configuration:
from symfluence.core.config import SymfluenceConfig, ensure_typed_config
# Load from file
config = SymfluenceConfig.from_file("config.yaml")
# From dictionary
config = SymfluenceConfig(**config_dict)
# Ensure typed config (for mixed dict/config inputs)
config = ensure_typed_config(maybe_dict_or_config)
# Access configuration values
domain = config.domain.name
model = config.model.hydrological_model
# Convert to dictionary
flat_dict = config.to_dict(flatten=True)
Utilities#
Path Management#
from symfluence.data.path_manager import PathManager
pm = PathManager(config)
# Access standard paths
project_dir = pm.project_dir
forcing_dir = pm.forcing_dir
simulations_dir = pm.simulations_dir
observations_dir = pm.observations_dir
Logging#
from symfluence.project.logging_manager import LoggingManager
# Initialize logging
log_mgr = LoggingManager(config)
logger = log_mgr.get_logger("my_module")
# Log messages
logger.info("Processing started")
logger.warning("Optional data not found")
logger.error("Critical failure")
Error Handling#
from symfluence.core.exceptions import (
SymfluenceError, # Base exception
ConfigurationError, # Config issues
DataAcquisitionError, # Data download failures
ModelExecutionError, # Model run failures
ValidationError, # Validation failures
)
try:
conf.run_workflow()
except ConfigurationError as e:
print(f"Configuration problem: {e}")
except ModelExecutionError as e:
print(f"Model failed: {e}")
except SymfluenceError as e:
print(f"General error: {e}")
Advanced Usage#
Custom Workflow#
from symfluence import SYMFLUENCE
conf = SYMFLUENCE("config.yaml")
# Run subset of steps
conf.setup_project()
conf.define_domain()
# Skip to model execution (assumes data exists)
conf.preprocess_models()
conf.run_models()
# Custom post-processing
results = conf.postprocess_results()
# Access internal managers
model_mgr = conf.managers['model']
data_mgr = conf.managers['data']
Parallel Execution#
# Configure in YAML
# NUM_PROCESSES: 8
# PARALLEL_CALIBRATION: true
# Or programmatically
config_dict['NUM_PROCESSES'] = 8
config_dict['PARALLEL_CALIBRATION'] = True
conf = SYMFLUENCE(config_dict)
conf.calibrate_model() # Uses parallel execution
Batch Processing#
from symfluence import SYMFLUENCE
from pathlib import Path
# Process multiple domains
config_files = Path("configs/").glob("*.yaml")
for config_file in config_files:
print(f"Processing {config_file.name}")
conf = SYMFLUENCE(str(config_file))
conf.run_workflow()
References#
Getting Started — High-level workflow tutorial
Configuration — Configuration parameter reference
Configuration — Configuration system usage
Developer Guide — Extending SYMFLUENCE
Examples — Example workflows and use cases