# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2024-2026 SYMFLUENCE Team <dev@symfluence.org>
"""
Project management for SYMFLUENCE hydrological modeling setups.
Handles project directory structure creation, pour point generation,
and project metadata management for hydrological model domains.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional
import geopandas as gpd
from shapely.geometry import Point
from symfluence.core.mixins import ConfigurableMixin
if TYPE_CHECKING:
from symfluence.core.config.models import SymfluenceConfig
[docs]
class ProjectManager(ConfigurableMixin):
"""
Manages project-level operations including directory structure and initialization.
The ProjectManager is responsible for creating and managing the project directory
structure, handling pour point creation, and maintaining project metadata. It serves
as the foundation for all other SYMFLUENCE components by establishing the physical
file organization that the workflow depends on.
Key responsibilities:
- Creating the project directory structure
- Generating pour point shapefiles from coordinates
- Validating project structure integrity
- Providing project metadata to other components
Attributes:
config (Dict[str, Any]): Configuration dictionary containing project settings
logger (logging.Logger): Logger instance for recording operations
"""
[docs]
def __init__(self, config: 'SymfluenceConfig', logger: logging.Logger):
"""
Initialize the ProjectManager.
Args:
config: SymfluenceConfig instance
logger: Logger instance
Raises:
TypeError: If config is not a SymfluenceConfig instance
"""
# Import here to avoid circular imports at module level
from symfluence.core.config.models import SymfluenceConfig
if not isinstance(config, SymfluenceConfig):
raise TypeError(
f"config must be SymfluenceConfig, got {type(config).__name__}. "
"Use SymfluenceConfig.from_file() to load configuration."
)
# Set config via the ConfigMixin property
self._config = config
self.logger = logger
[docs]
def setup_project(self) -> Path:
"""
Set up the project directory structure.
Creates the main project directory and all required subdirectories.
Project layout (canonical, post-2026):
{project_dir}/
shapefiles/{pour_point, catchment, river_network, river_basins}/
data/
attributes/ <- DEM, soil, landclass, etc.
forcing/ <- raw + merged + basin-averaged forcing
observations/ <- streamflow, snotel, etc.
model_ready/ <- model-agnostic store
The ``data/`` prefix is created up-front so that
``resolve_data_subdir`` consistently resolves to the new layout
for fresh projects. Without this, ``setup_project`` used to
create the legacy ``attributes/`` directory directly, which made
``resolve_data_subdir`` pick the legacy path on subsequent
reads. Some downstream callers (e.g. TauDEM in
``define_domain``) construct the path from string templates
anchored at ``data/attributes/...`` and then fail to find the
DEM that was written into the legacy ``attributes/...`` tree.
Legacy projects with a pre-existing ``attributes/`` directory
continue to work via the backward-compat branch in
``resolve_data_subdir`` — this change only affects fresh
``setup_project`` runs.
Returns:
Path: Path to the created project directory
Raises:
OSError: If directory creation fails due to permission or disk space issues
"""
self.logger.info(f"Setting up project for domain: {self.domain_name}")
# Create main project directory
self.project_dir.mkdir(parents=True, exist_ok=True)
# Top-level shapefile structure (unchanged — shapefiles live
# outside data/ by convention since they are domain-defining
# artefacts produced before any data acquisition).
shapefile_subdirs = ['pour_point', 'catchment', 'river_network', 'river_basins']
shapefiles_path = self.project_dir / 'shapefiles'
shapefiles_path.mkdir(parents=True, exist_ok=True)
for subdir in shapefile_subdirs:
(shapefiles_path / subdir).mkdir(parents=True, exist_ok=True)
# Canonical data subtree. We only create ``data/{subdir}`` when
# no legacy ``{subdir}`` already exists at the project root.
# Pre-staged test fixtures and legacy domains keep working
# because resolve_data_subdir still finds their legacy paths.
# Fresh projects get the canonical layout because neither path
# exists yet, so we create ``data/{subdir}`` and resolve_data_subdir
# picks it on every read.
data_subdirs = ['attributes', 'forcing', 'observations', 'model_ready']
data_path = self.project_dir / 'data'
data_path.mkdir(parents=True, exist_ok=True)
for subdir in data_subdirs:
legacy_path = self.project_dir / subdir
if legacy_path.exists():
self.logger.debug(
f"Legacy '{subdir}/' directory present at {legacy_path}; "
f"skipping creation of data/{subdir} so resolve_data_subdir "
f"keeps using the legacy path."
)
continue
(data_path / subdir).mkdir(parents=True, exist_ok=True)
self.logger.info(f"Project directory created at: {self.project_dir}")
return self.project_dir
[docs]
def create_pour_point(self) -> Optional[Path]:
"""
Create the outlet (pour point) shapefile from coordinates if specified.
Writes a GeoDataFrame of one or more point geometries and saves it as a
shapefile. ``POUR_POINT_COORDS`` is the primary, most-downstream outlet
that defines the domain extent (id 0). Any ``POUR_POINT_ADDITIONAL_COORDS``
are written as additional interior outlets (id 1..N) so TauDEM breaks the
stream network at each, forcing subbasin/GRU boundaries to align with
interior gauges. Additional outlets only apply to ``semidistributed`` /
``distributed`` delineation; they are ignored (with a warning) for other
methods, where a single basin is delineated. If ``POUR_POINT_COORDS`` is
'default', assumes a user-provided pour point shapefile exists.
Returns:
Optional[Path]: Path to the created pour point shapefile if successful,
None if using a user-provided shapefile or if creation fails
Raises:
ValueError: If the pour point coordinates are in an invalid format
Exception: For other errors during shapefile creation
"""
from symfluence.core.validation import parse_pour_point_coords
# Check if using user-provided shapefile
pour_point_coords = self._get_config_value(
lambda: self.config.domain.pour_point_coords,
'default'
)
if str(pour_point_coords).lower() == 'default':
self.logger.info("Using user-provided pour point shapefile")
return None
try:
# Parse the primary outlet (id 0).
lat, lon = map(float, str(pour_point_coords).split('/'))
points = [Point(lon, lat)] # Note: Point takes (lon, lat) order
# Append additional interior outlets, but only for distributed methods —
# for lumped/point a single basin is delineated and extra outlets would
# produce multiple/ambiguous watersheds.
additional = self._get_config_value(
lambda: self.config.domain.pour_point_additional_coords,
default=None,
dict_key='POUR_POINT_ADDITIONAL_COORDS',
)
if additional:
method = str(self._get_config_value(
lambda: self.config.domain.definition_method,
default='lumped',
dict_key='DOMAIN_DEFINITION_METHOD',
)).lower()
if method in ('semidistributed', 'distributed'):
extra_pairs = parse_pour_point_coords(
additional, context='POUR_POINT_ADDITIONAL_COORDS'
)
points.extend(Point(p_lon, p_lat) for p_lat, p_lon in extra_pairs)
self.logger.info(
f"Added {len(extra_pairs)} interior outlet(s) to the pour "
f"point shapefile for subbasin-aligned delineation."
)
else:
self.logger.warning(
f"POUR_POINT_ADDITIONAL_COORDS is set but DOMAIN_DEFINITION_METHOD "
f"is '{method}'; additional outlets only apply to "
f"'semidistributed'/'distributed' and will be ignored."
)
# Create GeoDataFrame with a stable integer 'id' (0 = primary outlet)
# that survives the TauDEM round-trip for later gauge<->subbasin mapping.
gdf = gpd.GeoDataFrame(
{'id': list(range(len(points))), 'geometry': points},
crs="EPSG:4326",
)
# Determine output path
output_path = self.project_dir / "shapefiles" / "pour_point"
shp_path = self._get_config_value(
lambda: self.config.paths.pour_point_shp_path,
'default'
)
if shp_path != 'default' and shp_path is not None:
output_path = Path(shp_path)
# Determine shapefile name
pour_point_shp_name = f"{self.domain_name}_pourPoint.shp"
shp_name = self._get_config_value(
lambda: self.config.paths.pour_point_shp_name,
'default'
)
if shp_name != 'default' and shp_name is not None:
pour_point_shp_name = shp_name
# Ensure output directory exists
output_path.mkdir(parents=True, exist_ok=True)
output_file = output_path / pour_point_shp_name
# Save shapefile
gdf.to_file(output_file)
self.logger.info(f"Pour point shapefile created successfully: {output_file}")
return output_file
except ValueError:
self.logger.error("Invalid pour point coordinates format. Expected 'lat/lon'.")
return None
except Exception as e: # noqa: BLE001 — configuration resilience
self.logger.error(f"Error creating pour point shapefile: {str(e)}")
return None
[docs]
def get_project_info(self) -> Dict[str, Any]:
"""
Get information about the project configuration.
Collects key project metadata into a dictionary for reporting,
logging, or providing status information to other components.
The returned information includes:
- Domain name
- Experiment ID
- Project directory path
- Data directory path
- Pour point coordinates
Returns:
Dict[str, Any]: Dictionary containing project information
"""
info = {
'domain_name': self.domain_name,
'experiment_id': self.experiment_id,
'project_dir': str(self.project_dir),
'data_dir': str(self.data_dir),
'pour_point_coords': self._get_config_value(
lambda: self.config.domain.pour_point_coords
)
}
return info