Source code for symfluence.geospatial.domain_manager

# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2024-2026 SYMFLUENCE Team <dev@symfluence.org>

"""
Domain management facade for SYMFLUENCE geospatial operations.

Coordinates domain definition, delineation, and discretization workflows
with integrated visualization and artifact tracking.
"""
from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

from symfluence.core.exceptions import GeospatialError, symfluence_error_handler
from symfluence.core.mixins import ConfigurableMixin
from symfluence.geospatial.delineation import (
    DelineationArtifacts,
    DomainDelineator,
    create_point_domain_shapefile,
)
from symfluence.geospatial.discretization import DiscretizationArtifacts, DomainDiscretizationRunner

if TYPE_CHECKING:
    from symfluence.core.config.models import SymfluenceConfig


[docs] class DomainManager(ConfigurableMixin): """ Orchestrates all geospatial domain operations for hydrological modeling setup. This manager coordinates domain definition (delineation), spatial discretization, and artifact tracking for hydrological modeling workflows. It provides a unified interface for creating HRU (Hydrologic Response Unit) configurations from various spatial data sources and discretization strategies. Architecture: Facade Pattern - Coordinates specialized geospatial services: - **DomainDelineator**: Watershed boundary extraction and network topology - **DomainDiscretizationRunner**: HRU creation via spatial disaggregation - **Artifact Tracking**: Maintains references to all created shapefiles - **Visualization Integration**: Optional reporting for QA/QC Domain Definition Methods: **point**: - Creates square bounding box domain from coordinates - Use case: FLUXNET sites, point-scale modeling - Output: Single polygon shapefile **lumped**: - Single-basin watershed delineation from pour point - Use case: Traditional lumped hydrological modeling - Output: Single watershed polygon + optional delineated routing network - Special: Supports lumped-to-distributed routing workflow - With subset_from_geofabric=True: Dissolves geofabric basins to single polygon **semidistributed**: - Full TauDEM-based watershed delineation from DEM - Use case: Detailed distributed modeling with subcatchments - Output: River network + subcatchment polygons - Optional: Coastal watershed handling - With subset_from_geofabric=True: Extracts from existing geofabric **distributed**: - Regular grid domain with D8 flow direction - Use case: Grid-based land surface models (VIC, MESH, CLM) - Output: Grid cells as both HRUs and routing segments - grid_source='generate': Create grid from bounding box - grid_source='native': Match forcing data resolution Discretization Strategies: **lumped**: - Single HRU representing entire basin - No spatial disaggregation **elevation**: - Elevation bands (e.g., 100m intervals) - Use case: Snow modeling, orographic effects **landclass**: - Land cover types (forest, urban, agriculture, etc.) - Use case: Land surface heterogeneity **soilclass**: - Soil classification types - Use case: Infiltration and runoff variability **aspect**: - Slope aspect classes (N, NE, E, SE, S, SW, W, NW) - Use case: Solar radiation and snow redistribution **radiation**: - Potential solar radiation classes - Use case: Energy balance modeling **combined**: - Multiple attributes combined (e.g., elevation × landclass) - Use case: Capturing complex spatial heterogeneity - Handles attribute interactions and MultiPolygons Workflow Sequence: 1. **define_domain()**: Create/extract watershed boundaries → Produces DelineationArtifacts (river basins, network, pour point) 2. **discretize_domain()**: Subdivide into HRUs → Produces DiscretizationArtifacts (HRU shapefile, attributes) 3. **Visualization** (optional): Spatial QA/QC plots → Generated via reporting_manager if available Artifact Tracking: DelineationArtifacts: - method: Domain definition method used - river_basins_path: Path to basin shapefile - river_network_path: Path to river network shapefile - pour_point_path: Path to pour point shapefile - metadata: Additional delineation metadata DiscretizationArtifacts: - method: Discretization method used - hru_shapefile_path: Path to HRU shapefile - attributes: HRU attributes DataFrame - statistics: Discretization statistics (HRU count, min/max areas) Configuration Dependencies: Domain Definition: - DOMAIN_DEFINITION_METHOD: point/lumped/semidistributed/distributed - DOMAIN_NAME: Basin identifier - SUBSET_FROM_GEOFABRIC: Extract from existing geofabric (default: False) - GRID_SOURCE (distributed): 'generate' or 'native' - NATIVE_GRID_DATASET (distributed + native): Dataset identifier (default: 'era5') - POUR_POINT_SHP_PATH (lumped/semidistributed): Pour point location - RIVER_NETWORK_SHP_PATH (subset): Existing river network - DOMAIN_BOUNDING_BOX: Bbox coordinates - GRID_CELL_SIZE (distributed): Grid spacing in meters Discretization: - SUB_GRID_DISCRETIZATION: Discretization method - DEM_PATH: Elevation data (for elevation/aspect/radiation) - LAND_CLASS_PATH: Land cover data (for landclass) - SOIL_CLASS_PATH: Soil data (for soilclass) - ELEVATION_BAND_SIZE: Band interval in meters (default: 100) Delineation (TauDEM): - DELINEATE_COASTAL_WATERSHEDS: Coastal handling (True/False) - ROUTING_DELINEATION: Routing network strategy Output Files: Shapefiles created in ``project_dir/shapefiles/``. Delineation outputs: river_basins, river_network, pour_point shapefiles. Discretization outputs: catchment HRU shapefiles. Example structure:: shapefiles/ ├── river_basins/ │ └── bow_river_riverBasins_lumped.shp ├── river_network/ │ └── bow_river_riverNetwork_lumped.shp ├── pour_point/ │ └── bow_river_pourPoint.shp └── catchment/ └── bow_river_HRUs_elevation.shp Special Workflows: Lumped-to-Distributed Routing: 1. Define lumped domain (single watershed polygon) 2. Internally delineate subcatchments within lumped domain 3. Create area-weighted remapping (lumped HRU to distributed routing) 4. Enables distributed routing with lumped hydrology Coastal Watershed Delineation: - Special handling for basins draining to ocean - Avoids river network artifacts at coastline - Uses modified TauDEM workflow Grid-Based Distributed: - Creates regular grid cells - Assigns D8 flow direction from DEM - Detects and fixes routing cycles - Each cell is both HRU and routing segment Visualization Integration: If reporting_manager available: - Delineation: Watershed boundary maps, river network plots - Discretization: HRU spatial distribution, attribute histograms - QA/QC: Identifies potential issues (small HRUs, disconnected polygons) Error Handling: - Validates configuration before execution - Raises descriptive errors for missing required shapefiles - Logs warnings for non-critical issues - Provides context for TauDEM failures Example Workflow: >>> from symfluence.geospatial.domain_manager import DomainManager >>> config = SymfluenceConfig.from_file('config.yaml') >>> logger = setup_logger() >>> reporting = ReportingManager(config, logger) >>> >>> # Initialize manager >>> domain_mgr = DomainManager(config, logger, reporting) >>> >>> # Define watershed boundaries >>> domain_mgr.define_domain() >>> print(domain_mgr.delineation_artifacts.river_basins_path) # ./shapefiles/river_basins/bow_river_riverBasins_lumped.shp >>> >>> # Discretize into elevation bands >>> domain_mgr.discretize_domain() >>> print(domain_mgr.discretization_artifacts.statistics) # {'hru_count': 8, 'min_area_km2': 120.5, 'max_area_km2': 450.2} Performance Considerations: - Delineation: ~1-30 minutes (depends on DEM resolution, TauDEM) - Discretization: ~10 seconds - 5 minutes (depends on attribute resolution) - Grid generation: ~1-10 minutes (depends on grid cell count) - Memory: Peak during raster operations (~2-8 GB for high-res DEMs) Notes: - DomainDelineator initialized eagerly - DomainDiscretizationRunner initialized lazily when needed - Artifacts tracked for downstream workflows (preprocessing, modeling) - Reporting integration provides visual validation - Supports both simple (lumped) and complex (combined attributes) setups See Also: - geospatial.delineation.DomainDelineator: Watershed delineation - geospatial.discretization.DomainDiscretizationRunner: HRU creation - geospatial.discretization.core.DomainDiscretizer: Discretization engine - geospatial.geofabric: Geofabric delineation backends """
[docs] def __init__(self, config: 'SymfluenceConfig', logger: logging.Logger, reporting_manager: Optional[Any] = None): """ Initialize the Domain Manager. Args: config: SymfluenceConfig instance logger: Logger instance reporting_manager: ReportingManager instance Raises: TypeError: If config is not a SymfluenceConfig instance """ # Import here to avoid circular imports at module level from symfluence.core.config.models import SymfluenceConfig if not isinstance(config, SymfluenceConfig): raise TypeError( f"config must be SymfluenceConfig, got {type(config).__name__}. " "Use SymfluenceConfig.from_file() to load configuration." ) # Set config via the ConfigMixin property self._config = config self.logger = logger self.reporting_manager = reporting_manager # Initialize domain workflows self.domain_delineator = DomainDelineator(config, self.logger, self.reporting_manager) self.domain_discretizer = None # Initialized when needed self.delineation_artifacts: Optional[DelineationArtifacts] = None self.discretization_artifacts: Optional[DiscretizationArtifacts] = None # Create point domain shapefile if method is 'point' domain_method = self._get_config_value( lambda: self.config.domain.definition_method ) if domain_method == 'point': self.create_point_domain_shapefile()
[docs] def create_point_domain_shapefile(self) -> Optional[Path]: """ Create a square basin shapefile from bounding box coordinates for point modelling. This method creates a rectangular polygon from the BOUNDING_BOX_COORDS and saves it as a shapefile for point-based modelling approaches. Returns: Path to the created shapefile or None if failed """ return create_point_domain_shapefile(self.config, self.logger)
[docs] def define_domain( self, ) -> Tuple[Optional[Union[Path, Tuple[Path, Path]]], DelineationArtifacts]: """ Define the domain using the configured method. Returns: Tuple of the domain result and delineation artifacts """ domain_method = self._get_config_value( lambda: self.config.domain.definition_method ) self.logger.debug(f"Domain definition workflow starting with: {domain_method}") result, artifacts = self.domain_delineator.define_domain() self.delineation_artifacts = artifacts if result: self.logger.info(f"Domain definition completed using method: {domain_method}") # Generate diagnostic plots if enabled if self.reporting_manager and artifacts.river_basins_path: with symfluence_error_handler( "generating domain definition diagnostics", self.logger, reraise=False, error_type=GeospatialError ): import geopandas as gpd basin_gdf = gpd.read_file(artifacts.river_basins_path) dem_path = self.project_attributes_dir / 'elevation' / 'dem' / f"{self.domain_name}_elv.tif" self.reporting_manager.diagnostic_domain_definition( basin_gdf=basin_gdf, dem_path=dem_path if dem_path.exists() else None ) self.logger.debug("Domain definition workflow finished") return result, artifacts
[docs] def discretize_domain( self, ) -> Tuple[Optional[Union[Path, dict]], DiscretizationArtifacts]: """ Discretize the domain into HRUs or GRUs. Returns: Tuple of HRU shapefile(s) and discretization artifacts """ with symfluence_error_handler( "domain discretization", self.logger, error_type=GeospatialError ): discretization_method = self._get_config_value( lambda: self.config.domain.discretization ) self.logger.debug(f"Discretizing domain using method: {discretization_method}") # Initialize discretizer if not already done if self.domain_discretizer is None: self.domain_discretizer = DomainDiscretizationRunner(self.config, self.logger) # Perform discretization hru_shapefile, artifacts = self.domain_discretizer.discretize_domain() self.discretization_artifacts = artifacts # Visualize the discretized domain self.visualize_discretized_domain() # Generate diagnostic plots if enabled if self.reporting_manager and artifacts.hru_paths: with symfluence_error_handler( "generating discretization diagnostics", self.logger, reraise=False, error_type=GeospatialError ): import geopandas as gpd # hru_paths can be Path or Dict[str, Path] hru_path = artifacts.hru_paths if isinstance(hru_path, dict): # Take first path from dict hru_path = next(iter(hru_path.values())) hru_gdf = gpd.read_file(hru_path) self.reporting_manager.diagnostic_discretization( hru_gdf=hru_gdf, method=discretization_method ) return hru_shapefile, artifacts
[docs] def visualize_domain(self) -> Optional[Path]: """ Create visualization of the domain. Returns: Path to the created plot or None if failed """ if self.reporting_manager: return self.reporting_manager.visualize_domain() return None
[docs] def visualize_discretized_domain(self) -> Optional[Path]: """ Create visualization of the discretized domain. Returns: Path to the created plot or None if failed """ if self.reporting_manager: discretization_method = self._get_config_value( lambda: self.config.domain.discretization ) domain_method = self._get_config_value( lambda: self.config.domain.definition_method ) if domain_method != 'point': return self.reporting_manager.visualize_discretized_domain(discretization_method) else: self.logger.info('Point scale model, not creating visualisation') return None return None
[docs] def get_domain_info(self) -> Dict[str, Any]: """ Get information about the current domain configuration. Returns: Dictionary containing domain information """ info = { 'domain_name': self.domain_name, 'domain_method': self._get_config_value( lambda: self.config.domain.definition_method ), 'spatial_mode': self._get_config_value( lambda: self.config.domain.definition_method ), 'discretization_method': self._get_config_value( lambda: self.config.domain.discretization ), 'pour_point_coords': self._get_config_value( lambda: self.config.domain.pour_point_coords ), 'bounding_box': self._get_config_value( lambda: self.config.domain.bounding_box_coords ), 'project_dir': str(self.project_dir), } # Add shapefile paths if they exist river_basins_path = self.project_dir / "shapefiles" / "river_basins" catchment_path = self.project_dir / "shapefiles" / "catchment" if river_basins_path.exists(): info['river_basins_path'] = str(river_basins_path) if catchment_path.exists(): info['catchment_path'] = str(catchment_path) return info
[docs] def validate_domain_configuration(self) -> bool: """ Validate domain configuration settings. .. deprecated:: Use :meth:`validate_readiness` instead. Config-level checks (required keys, definition method, bounding box format) are now handled by Pydantic validators at config construction time. Returns: True if configuration is valid, False otherwise """ import warnings warnings.warn( "validate_domain_configuration() is deprecated, use validate_readiness()", DeprecationWarning, stacklevel=2, ) readiness = self.validate_readiness() return all(readiness.values()) if readiness else True
[docs] def validate_readiness(self) -> Dict[str, bool]: """ Validate that this manager is ready for execution. Checks runtime prerequisites that Pydantic cannot verify at config construction time: subset geofabric config and grid source for distributed method. Returns: Dict mapping check names to pass/fail booleans. """ results = {} # Validate subset configuration subset_from_geofabric = self._get_config_value( lambda: self.config.domain.subset_from_geofabric, default=False ) if subset_from_geofabric: geofabric_type = self._get_config_value( lambda: self.config.domain.delineation.geofabric_type, default='na' ) ok = geofabric_type not in ('na', '', None) if not ok: self.logger.error( "subset_from_geofabric=True requires GEOFABRIC_TYPE to be set. " "Valid values: merit, tdx, nws, hydrosheds, etc." ) results['subset_config'] = ok # Validate grid_source for distributed method domain_method = self._get_config_value( lambda: self.config.domain.definition_method ) if domain_method == 'distributed': grid_source = self._get_config_value( lambda: self.config.domain.grid_source, default='generate' ) valid_grid_sources = ['generate', 'native'] ok = grid_source in valid_grid_sources if not ok: self.logger.error( f"Invalid grid_source: {grid_source}. Must be one of {valid_grid_sources}" ) results['grid_source'] = ok return results