Source code for symfluence.reporting.reporting_manager

# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2024-2026 SYMFLUENCE Team <dev@symfluence.org>

"""Central reporting facade for coordinating all SYMFLUENCE visualizations.

Provides a unified interface for generating publication-ready visualizations
across all modeling stages: domain setup, calibration, evaluation, and
multi-model comparison. Implements the Facade pattern to orchestrate
specialized plotters while hiding complexity from client code.

Heavy lifting is delegated to three orchestrators:
- ``ModelOutputOrchestrator``: registry-based model output dispatch
- ``CalibrationOrchestrator``: post-calibration target dispatch and comparison plots
- ``DiagnosticsOrchestrator``: per-workflow-step diagnostic validation plots
"""
from __future__ import annotations

from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from symfluence.core.constants import ConfigKeys
from symfluence.core.exceptions import ReportingError, symfluence_error_handler
from symfluence.core.mixins import ConfigMixin

# Config
from symfluence.reporting.config.plot_config import DEFAULT_PLOT_CONFIG, PlotConfig
from symfluence.reporting.core.decorators import skip_if_not_diagnostic, skip_if_not_visualizing

# Type hints only - actual imports are lazy
if TYPE_CHECKING:
    from symfluence.core.config.models import SymfluenceConfig
    from symfluence.reporting.orchestrators.calibration_orchestrator import CalibrationOrchestrator
    from symfluence.reporting.orchestrators.diagnostics_orchestrator import DiagnosticsOrchestrator
    from symfluence.reporting.orchestrators.model_output_orchestrator import ModelOutputOrchestrator
    from symfluence.reporting.plotters.analysis_plotter import AnalysisPlotter
    from symfluence.reporting.plotters.benchmark_plotter import BenchmarkPlotter
    from symfluence.reporting.plotters.diagnostic_plotter import DiagnosticPlotter
    from symfluence.reporting.plotters.domain_plotter import DomainPlotter
    from symfluence.reporting.plotters.forcing_comparison_plotter import ForcingComparisonPlotter
    from symfluence.reporting.plotters.model_comparison_plotter import ModelComparisonPlotter
    from symfluence.reporting.plotters.optimization_plotter import OptimizationPlotter
    from symfluence.reporting.plotters.snow_plotter import SnowPlotter
    from symfluence.reporting.plotters.workflow_diagnostic_plotter import WorkflowDiagnosticPlotter
    from symfluence.reporting.processors.data_processor import DataProcessor
    from symfluence.reporting.processors.spatial_processor import SpatialProcessor


[docs] class ReportingManager(ConfigMixin): """Central facade coordinating all visualization and reporting in SYMFLUENCE. Orchestrates diverse visualization workflows by delegating to specialized plotters for domain maps, calibration analysis, performance benchmarking, and diagnostics. Uses Facade and Lazy Initialization patterns. Example: >>> rm = ReportingManager(config, logger, visualize=True) >>> rm.plot_domain() # Generate domain overview map >>> rm.plot_calibration() # Plot calibration convergence """
[docs] def __init__(self, config: 'SymfluenceConfig', logger: Any, visualize: bool = False, diagnostic: bool = False): """Initialize the ReportingManager. Args: config: SymfluenceConfig instance. logger: Logger instance. visualize: Boolean flag indicating if visualization is enabled. diagnostic: Boolean flag indicating if diagnostic mode is enabled. """ from symfluence.core.config.models import SymfluenceConfig if not isinstance(config, SymfluenceConfig): raise TypeError( f"config must be SymfluenceConfig, got {type(config).__name__}. " "Use SymfluenceConfig.from_file() to load configuration." ) self._config = config self.logger = logger self.visualize = visualize self.diagnostic = diagnostic self.project_dir = Path(config.system.data_dir) / f"domain_{config.domain.name}"
# ========================================================================= # Component Properties (Lazy Initialization via cached_property) # ========================================================================= @cached_property def plot_config(self) -> PlotConfig: """Lazy initialization of plot configuration.""" return DEFAULT_PLOT_CONFIG @cached_property def data_processor(self) -> 'DataProcessor': """Lazy initialization of data processor.""" from symfluence.reporting.processors.data_processor import DataProcessor return DataProcessor(self.config, self.logger) @cached_property def spatial_processor(self) -> 'SpatialProcessor': """Lazy initialization of spatial processor.""" from symfluence.reporting.processors.spatial_processor import SpatialProcessor return SpatialProcessor(self.config, self.logger) @cached_property def domain_plotter(self) -> 'DomainPlotter': """Lazy initialization of domain plotter.""" from symfluence.reporting.plotters.domain_plotter import DomainPlotter return DomainPlotter(self.config, self.logger, self.plot_config) @cached_property def optimization_plotter(self) -> 'OptimizationPlotter': """Lazy initialization of optimization plotter.""" from symfluence.reporting.plotters.optimization_plotter import OptimizationPlotter return OptimizationPlotter(self.config, self.logger, self.plot_config) @cached_property def analysis_plotter(self) -> 'AnalysisPlotter': """Lazy initialization of analysis plotter.""" from symfluence.reporting.plotters.analysis_plotter import AnalysisPlotter return AnalysisPlotter(self.config, self.logger, self.plot_config) @cached_property def benchmark_plotter(self) -> 'BenchmarkPlotter': """Lazy initialization of benchmark plotter.""" from symfluence.reporting.plotters.benchmark_plotter import BenchmarkPlotter return BenchmarkPlotter(self.config, self.logger, self.plot_config) @cached_property def snow_plotter(self) -> 'SnowPlotter': """Lazy initialization of snow plotter.""" from symfluence.reporting.plotters.snow_plotter import SnowPlotter return SnowPlotter(self.config, self.logger, self.plot_config) @cached_property def diagnostic_plotter(self) -> 'DiagnosticPlotter': """Lazy initialization of diagnostic plotter.""" from symfluence.reporting.plotters.diagnostic_plotter import DiagnosticPlotter return DiagnosticPlotter(self.config, self.logger, self.plot_config) @cached_property def model_comparison_plotter(self) -> 'ModelComparisonPlotter': """Lazy initialization of model comparison plotter.""" from symfluence.reporting.plotters.model_comparison_plotter import ModelComparisonPlotter return ModelComparisonPlotter(self.config, self.logger, self.plot_config) @cached_property def forcing_comparison_plotter(self) -> 'ForcingComparisonPlotter': """Lazy initialization of forcing comparison plotter.""" from symfluence.reporting.plotters.forcing_comparison_plotter import ForcingComparisonPlotter return ForcingComparisonPlotter(self.config, self.logger, self.plot_config) @cached_property def workflow_diagnostic_plotter(self) -> 'WorkflowDiagnosticPlotter': """Lazy initialization of workflow diagnostic plotter.""" from symfluence.reporting.plotters.workflow_diagnostic_plotter import WorkflowDiagnosticPlotter return WorkflowDiagnosticPlotter(self.config, self.logger, self.plot_config) # ========================================================================= # Orchestrator Properties (Lazy Initialization) # ========================================================================= @cached_property def _model_output_orchestrator(self) -> 'ModelOutputOrchestrator': from symfluence.reporting.orchestrators.model_output_orchestrator import ModelOutputOrchestrator return ModelOutputOrchestrator( config=self.config, logger=self.logger, visualize=self.visualize, plot_config=self.plot_config, analysis_plotter=self.analysis_plotter, ) @cached_property def _calibration_orchestrator(self) -> 'CalibrationOrchestrator': from symfluence.reporting.orchestrators.calibration_orchestrator import CalibrationOrchestrator return CalibrationOrchestrator( config=self.config, logger=self.logger, visualize=self.visualize, project_dir=self.project_dir, optimization_plotter=self.optimization_plotter, model_comparison_plotter=self.model_comparison_plotter, analysis_plotter=self.analysis_plotter, model_output_orchestrator=self._model_output_orchestrator, ) @cached_property def _diagnostics_orchestrator(self) -> 'DiagnosticsOrchestrator': from symfluence.reporting.orchestrators.diagnostics_orchestrator import DiagnosticsOrchestrator return DiagnosticsOrchestrator( config=self.config, logger=self.logger, diagnostic=self.diagnostic, project_dir=self.project_dir, workflow_diagnostic_plotter=self.workflow_diagnostic_plotter, ) # ========================================================================= # Public Methods — Data Processing & Utility # =========================================================================
[docs] @skip_if_not_visualizing() def visualize_data_distribution(self, data: Any, variable_name: str, stage: str) -> None: """Visualize data distribution (histogram/boxplot).""" self.diagnostic_plotter.plot_data_distribution(data, variable_name, stage)
[docs] @skip_if_not_visualizing() def visualize_spatial_coverage(self, raster_path: Path, variable_name: str, stage: str) -> None: """Visualize spatial coverage of raster data.""" self.diagnostic_plotter.plot_spatial_coverage(raster_path, variable_name, stage)
[docs] @skip_if_not_visualizing() def visualize_forcing_comparison( self, raw_forcing_file: Path, remapped_forcing_file: Path, forcing_grid_shp: Path, hru_shp: Path, variable: str = 'precipitation_flux', time_index: int = 0, ) -> Optional[str]: """Visualize raw vs. remapped forcing data comparison.""" self.logger.info("Creating raw vs. remapped forcing comparison visualization...") return self.forcing_comparison_plotter.plot_raw_vs_remapped( raw_forcing_file=raw_forcing_file, remapped_forcing_file=remapped_forcing_file, forcing_grid_shp=forcing_grid_shp, hru_shp=hru_shp, variable=variable, time_index=time_index, )
[docs] def is_visualization_enabled(self) -> bool: """Check if visualization is enabled.""" return self.visualize
[docs] def update_sim_reach_id(self, config_path: Optional[str] = None) -> Optional[int]: """Update the SIM_REACH_ID in both the config object and YAML file.""" return self.spatial_processor.update_sim_reach_id(config_path)
# --- Domain Visualization ---
[docs] @skip_if_not_visualizing() def visualize_domain(self) -> Optional[str]: """Visualize the domain boundaries and features.""" self.logger.info("Creating domain visualization...") return self.domain_plotter.plot_domain()
[docs] @skip_if_not_visualizing() def visualize_discretized_domain(self, discretization_method: str) -> Optional[str]: """Visualize the discretized domain (HRUs/GRUs).""" self.logger.info(f"Creating discretization visualization for {discretization_method}...") return self.domain_plotter.plot_discretized_domain(discretization_method)
# --- Model Output Visualization (delegates to ModelOutputOrchestrator) ---
[docs] @skip_if_not_visualizing() def visualize_model_outputs(self, model_outputs: List[Tuple[str, str]], obs_files: List[Tuple[str, str]]) -> Optional[str]: """Visualize model outputs (streamflow comparison).""" return self._model_output_orchestrator.visualize_model_outputs(model_outputs, obs_files)
[docs] @skip_if_not_visualizing() def visualize_lumped_model_outputs(self, model_outputs: List[Tuple[str, str]], obs_files: List[Tuple[str, str]]) -> Optional[str]: """Visualize lumped model outputs.""" return self._model_output_orchestrator.visualize_lumped_model_outputs(model_outputs, obs_files)
[docs] @skip_if_not_visualizing() def visualize_fuse_outputs(self, model_outputs: List[Tuple[str, str]], obs_files: List[Tuple[str, str]]) -> Optional[str]: """Visualize FUSE model outputs.""" return self._model_output_orchestrator.visualize_fuse_outputs(model_outputs, obs_files)
[docs] @skip_if_not_visualizing(default={}) def visualize_summa_outputs(self, experiment_id: str) -> Dict[str, str]: """Visualize SUMMA model outputs (all variables).""" return self._model_output_orchestrator.visualize_summa_outputs(experiment_id)
[docs] @skip_if_not_visualizing() def visualize_ngen_results(self, sim_df: Any, obs_df: Optional[Any], experiment_id: str, results_dir: Path) -> None: """Visualize NGen streamflow plots.""" self._model_output_orchestrator.visualize_ngen_results(sim_df, obs_df, experiment_id, results_dir)
[docs] @skip_if_not_visualizing() def visualize_lstm_results(self, results_df: Any, obs_streamflow: Any, obs_snow: Any, use_snow: bool, output_dir: Path, experiment_id: str) -> None: """Visualize LSTM simulation results.""" self._model_output_orchestrator.visualize_lstm_results( results_df, obs_streamflow, obs_snow, use_snow, output_dir, experiment_id, )
[docs] @skip_if_not_visualizing() def visualize_hype_results(self, sim_flow: Any, obs_flow: Any, outlet_id: str, domain_name: str, experiment_id: str, project_dir: Path) -> None: """Visualize HYPE streamflow comparison.""" self._model_output_orchestrator.visualize_hype_results( sim_flow, obs_flow, outlet_id, domain_name, experiment_id, project_dir, )
[docs] @skip_if_not_visualizing() def visualize_model_results(self, model_name: str, **kwargs) -> Optional[Any]: """Visualize model results using registry-based dispatch.""" return self._model_output_orchestrator.visualize_model_results(model_name, **kwargs)
# --- Analysis Visualization ---
[docs] @skip_if_not_visualizing() def visualize_timeseries_results(self) -> None: """Visualize timeseries results from the standard results file.""" self.logger.info("Creating timeseries visualizations from results file...") with symfluence_error_handler( "creating timeseries visualizations", self.logger, reraise=False, error_type=ReportingError, ): df = self.data_processor.read_results_file() exp_id = self._get_config_value(lambda: self.config.domain.experiment_id, default='default', dict_key=ConfigKeys.EXPERIMENT_ID) domain_name = self._get_config_value(lambda: self.config.domain.name, default='unknown', dict_key=ConfigKeys.DOMAIN_NAME) self.analysis_plotter.plot_timeseries_results(df, exp_id, domain_name) self.analysis_plotter.plot_diagnostics(df, exp_id, domain_name)
[docs] @skip_if_not_visualizing(default=[]) def visualize_benchmarks(self, benchmark_results: Dict[str, Any]) -> List[str]: """Visualize benchmark results.""" self.logger.info("Creating benchmark visualizations...") return self.benchmark_plotter.plot_benchmarks(benchmark_results)
[docs] @skip_if_not_visualizing(default={}) def visualize_snow_comparison(self, model_outputs: List[List[str]]) -> Dict[str, Any]: """Visualize snow comparison.""" self.logger.info("Creating snow comparison visualization...") formatted_outputs = [tuple(item) for item in model_outputs] return self.snow_plotter.plot_snow_comparison(formatted_outputs)
[docs] @skip_if_not_visualizing() def visualize_optimization_progress(self, history: List[Dict], output_dir: Path, calibration_variable: str, metric: str) -> None: """Visualize optimization progress.""" self.logger.info("Creating optimization progress visualization...") self.optimization_plotter.plot_optimization_progress(history, output_dir, calibration_variable, metric)
[docs] @skip_if_not_visualizing() def visualize_optimization_depth_parameters(self, history: List[Dict], output_dir: Path) -> None: """Visualize depth parameter evolution.""" self.logger.info("Creating depth parameter visualization...") self.optimization_plotter.plot_depth_parameters(history, output_dir)
[docs] @skip_if_not_visualizing() def visualize_sensitivity_analysis(self, sensitivity_data: Any, output_file: Path, plot_type: str = 'single') -> None: """Visualize sensitivity analysis results.""" self.logger.info(f"Creating sensitivity analysis visualization ({plot_type})...") self.analysis_plotter.plot_sensitivity_analysis(sensitivity_data, output_file, plot_type)
[docs] @skip_if_not_visualizing() def visualize_decision_impacts(self, results_file: Path, output_folder: Path) -> None: """Visualize decision analysis impacts.""" self.logger.info("Creating decision impact visualizations...") self.analysis_plotter.plot_decision_impacts(results_file, output_folder)
[docs] @skip_if_not_visualizing() def visualize_hydrographs_with_highlight(self, results_file: Path, simulation_results: Dict, observed_streamflow: Any, decision_options: Dict, output_folder: Path, metric: str = 'kge') -> None: """Visualize hydrographs with top performers highlighted.""" self.logger.info(f"Creating hydrograph visualization with {metric} highlight...") self.analysis_plotter.plot_hydrographs_with_highlight( results_file, simulation_results, observed_streamflow, decision_options, output_folder, metric, )
[docs] @skip_if_not_visualizing() def visualize_drop_analysis(self, drop_data: List[Dict], optimal_threshold: float, project_dir: Path) -> None: """Visualize drop analysis for stream threshold selection.""" self.logger.info("Creating drop analysis visualization...") self.analysis_plotter.plot_drop_analysis(drop_data, optimal_threshold, project_dir)
# --- Calibration Visualization (delegates to CalibrationOrchestrator) ---
[docs] @skip_if_not_visualizing() def generate_model_comparison_overview( self, experiment_id: Optional[str] = None, context: str = 'run_model', ) -> Optional[str]: """Generate model comparison overview for all models with valid output.""" return self._calibration_orchestrator.generate_model_comparison_overview( experiment_id=experiment_id, context=context, )
[docs] @skip_if_not_visualizing(default={}) def visualize_calibration_results( self, experiment_id: Optional[str] = None, calibration_target: Optional[str] = None, ) -> Dict[str, str]: """Generate comprehensive post-calibration visualizations.""" return self._calibration_orchestrator.visualize_calibration_results( experiment_id=experiment_id, calibration_target=calibration_target, )
# ========================================================================= # Workflow Diagnostic Methods (delegates to DiagnosticsOrchestrator) # =========================================================================
[docs] @skip_if_not_diagnostic() def diagnostic_domain_definition(self, basin_gdf: Any, dem_path: Optional[Path] = None) -> Optional[str]: """Generate diagnostic plots for domain definition step.""" return self._diagnostics_orchestrator.diagnostic_domain_definition(basin_gdf, dem_path)
[docs] @skip_if_not_diagnostic() def diagnostic_discretization(self, hru_gdf: Any, method: str) -> Optional[str]: """Generate diagnostic plots for discretization step.""" return self._diagnostics_orchestrator.diagnostic_discretization(hru_gdf, method)
[docs] @skip_if_not_diagnostic() def diagnostic_observations(self, obs_df: Any, obs_type: str) -> Optional[str]: """Generate diagnostic plots for observation processing step.""" return self._diagnostics_orchestrator.diagnostic_observations(obs_df, obs_type)
[docs] @skip_if_not_diagnostic() def diagnostic_forcing_raw(self, forcing_nc: Path, domain_shp: Optional[Path] = None) -> Optional[str]: """Generate diagnostic plots for raw forcing acquisition step.""" return self._diagnostics_orchestrator.diagnostic_forcing_raw(forcing_nc, domain_shp)
[docs] @skip_if_not_diagnostic() def diagnostic_forcing_remapped( self, raw_nc: Path, remapped_nc: Path, hru_shp: Optional[Path] = None, ) -> Optional[str]: """Generate diagnostic plots for forcing remapping step.""" return self._diagnostics_orchestrator.diagnostic_forcing_remapped(raw_nc, remapped_nc, hru_shp)
[docs] @skip_if_not_diagnostic() def diagnostic_model_preprocessing(self, input_dir: Path, model_name: str) -> Optional[str]: """Generate diagnostic plots for model preprocessing step.""" return self._diagnostics_orchestrator.diagnostic_model_preprocessing(input_dir, model_name)
[docs] @skip_if_not_diagnostic() def diagnostic_model_output(self, output_nc: Path, model_name: str) -> Optional[str]: """Generate diagnostic plots for model output step.""" return self._diagnostics_orchestrator.diagnostic_model_output(output_nc, model_name)
[docs] @skip_if_not_diagnostic() def diagnostic_attributes( self, dem_path: Optional[Path] = None, soil_path: Optional[Path] = None, land_path: Optional[Path] = None, ) -> Optional[str]: """Generate diagnostic plots for attribute acquisition step.""" return self._diagnostics_orchestrator.diagnostic_attributes(dem_path, soil_path, land_path)
[docs] @skip_if_not_diagnostic() def diagnostic_calibration( self, history: Optional[List[Dict]] = None, best_params: Optional[Dict[str, float]] = None, obs_vs_sim: Optional[Dict[str, Any]] = None, model_name: str = 'Unknown', ) -> Optional[str]: """Generate diagnostic plots for calibration step.""" return self._diagnostics_orchestrator.diagnostic_calibration( history=history, best_params=best_params, obs_vs_sim=obs_vs_sim, model_name=model_name, )
[docs] @skip_if_not_diagnostic() def diagnostic_coupling_conservation( self, graph: Any, output_dir: Optional[Path] = None, ) -> Optional[str]: """Generate conservation diagnostic for a coupled model run.""" return self._diagnostics_orchestrator.diagnostic_coupling_conservation(graph, output_dir)