Source code for hydrobricks.trainer

from __future__ import annotations

import importlib
import logging
import os
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable

import numpy as np

from hydrobricks import spotpy
from hydrobricks._exceptions import ConfigurationError, DataError, ModelError

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from hydrobricks.forcing import Forcing
    from hydrobricks.models.model import Model
    from hydrobricks.observations import Observations
    from hydrobricks.parameters import ParameterSet


[docs] class SpotpySetup: """Setup class for SPOTPY optimization framework integration.""" def __init__( self, model: Model | list[Model], params: ParameterSet, forcing: Forcing | list[Forcing], obs: Observations | list[Observations], warmup: int = 365, obj_func: str | Callable[[np.ndarray, np.ndarray], float] | None = None, invert_obj_func: bool = False, dump_outputs: bool = False, dump_forcing: bool = False, dump_dir: str = "", ) -> None: """ Initialize SPOTPY setup for model calibration. Parameters ---------- model Single Model instance or list of Model instances for ensemble calibration. params ParameterSet defining the model parameters to calibrate. forcing Single Forcing instance or list of Forcing instances (one per model). obs Single Observations instance or list for observed data (one per model). warmup Number of days for model warmup period (skipped in evaluation). Default: 365 obj_func Objective function for optimization. If None, uses non-parametric Kling-Gupta Efficiency. Can be a string (HydroErr function name) or callable that takes (observed, simulated) and returns a scalar. Default: None invert_obj_func If True, multiply objective function result by -1 (for minimizers). Default: False dump_outputs If True, save all simulation outputs to disk. Default: False dump_forcing If True, save forcing data used in simulations. Default: False dump_dir Directory path for saving dumped outputs. Default: '' (current directory) Raises ------ ConfigurationError If invalid configuration parameters or mismatched ensemble sizes. DataError If observation data is invalid or missing. """ # Validate and normalize inputs self.model = self._normalize_model_list(model) self.params = params self.forcing = self._normalize_forcing_list(forcing) self.obs = self._normalize_observations_list(obs) # Validate ensemble sizes match self._validate_ensemble_sizes() # Validate and set configuration parameters self._validate_and_set_parameters( warmup, dump_dir, invert_obj_func, dump_outputs, dump_forcing ) # Initialize SPOTPY parameters self.params_spotpy = params.get_for_spotpy() self.random_forcing = params.needs_random_forcing() # Setup forcing and models self._setup_forcing_and_models() # Validate objective function self._validate_and_set_objective_function(obj_func) def _normalize_model_list(self, model: Model | list[Model]) -> list[Model]: """ Validate and normalize model input to a list. Parameters ---------- model Single Model instance or list of Model instances. Returns ------- list[Model] Validated list of Model instances. Raises ------ ConfigurationError If model is None or empty list. """ if model is None: raise ConfigurationError( "Model cannot be None.", item_name="model", reason="Required parameter missing", ) models = [model] if not isinstance(model, list) else model if len(models) == 0: raise ConfigurationError( "Model list cannot be empty.", item_name="model", reason="Empty model list", ) return models def _normalize_forcing_list( self, forcing: Forcing | list[Forcing] ) -> list[Forcing]: """ Validate and normalize forcing input to a list. Parameters ---------- forcing Single Forcing instance or list of Forcing instances. Returns ------- list[Forcing] Validated list of Forcing instances. Raises ------ ConfigurationError If forcing is None or empty list. """ if forcing is None: raise ConfigurationError( "Forcing cannot be None.", item_name="forcing", reason="Required parameter missing", ) forcings = [forcing] if not isinstance(forcing, list) else forcing if len(forcings) == 0: raise ConfigurationError( "Forcing list cannot be empty.", item_name="forcing", reason="Empty forcing list", ) return forcings def _normalize_observations_list( self, obs: Observations | list[Observations] ) -> list[np.ndarray]: """ Validate and normalize observations to a list of numpy arrays. Parameters ---------- obs Single Observations instance or list of Observations instances. Returns ------- list[np.ndarray] Validated list of observation data arrays. Raises ------ ConfigurationError If obs is None or empty list. DataError If observation data is missing, invalid, or empty. """ if obs is None: raise ConfigurationError( "Observations cannot be None.", item_name="obs", reason="Required parameter missing", ) obs_list = [obs] if not isinstance(obs, list) else obs if len(obs_list) == 0: raise ConfigurationError( "Observations list cannot be empty.", item_name="obs", reason="Empty observations list", ) observations = [] for idx, o in enumerate(obs_list): if not hasattr(o, "data") or o.data is None: raise DataError( f"Observations at index {idx} has no data attribute.", data_type="observations", reason="Missing or invalid observation data", ) if len(o.data) == 0: raise DataError( f"Observations at index {idx} has empty data.", data_type="observations", reason="Empty observation dataset", ) observations.append(o.data[0]) return observations def _validate_ensemble_sizes(self) -> None: """ Validate that models, forcing, and observations have matching sizes. Checks that the number of models equals the number of forcing instances and the number of observation datasets. Raises specific errors if sizes don't match. Raises ------ ConfigurationError If len(models) != len(forcing) or len(models) != len(observations), with details about which counts don't match. """ num_models = len(self.model) num_forcing = len(self.forcing) num_obs = len(self.obs) if num_models != num_forcing: raise ConfigurationError( f"Number of models ({num_models}) does not match " f"number of forcing datasets ({num_forcing}).", reason="Mismatched ensemble sizes", ) if num_models != num_obs: raise ConfigurationError( f"Number of models ({num_models}) does not match " f"number of observation datasets ({num_obs}).", reason="Mismatched ensemble sizes", ) def _validate_and_set_parameters( self, warmup: int, dump_dir: str, invert_obj_func: bool, dump_outputs: bool, dump_forcing: bool, ) -> None: """ Validate and set configuration parameters. Validates the warmup period and dump directory parameters, then sets all configuration attributes on the instance. Parameters ---------- warmup Number of days for model warmup period. Must be non-negative integer. dump_dir Directory path for saving dumped outputs. Only validated/created if dump_outputs or dump_forcing is True. invert_obj_func If True, multiply objective function result by -1 (for minimizers). dump_outputs If True, save all simulation outputs to disk. dump_forcing If True, save forcing data used in simulations. Raises ------ ConfigurationError If warmup is not a non-negative integer or if dump_dir cannot be created/accessed. """ if not isinstance(warmup, int) or warmup < 0: raise ConfigurationError( f"Warmup period must be non-negative integer, got {warmup}.", item_name="warmup", item_value=warmup, reason="Invalid warmup configuration", ) self.warmup = warmup if (dump_outputs or dump_forcing) and dump_dir: try: dump_path = os.path.abspath(dump_dir) os.makedirs(dump_path, exist_ok=True) self.dump_dir = dump_path except OSError as e: raise ConfigurationError( f'Cannot create dump directory "{dump_dir}": {e}', item_name="dump_dir", item_value=dump_dir, reason="Directory access error", ) from e else: self.dump_dir = "" self.dump_outputs = dump_outputs self.dump_forcing = dump_forcing self.invert_obj_func = invert_obj_func def _setup_forcing_and_models(self) -> None: """ Apply forcing operations and configure models. Applies parameter-dependent operations to all forcing datasets and configures models with forcing data if not using random forcing. Side Effects ----------- - Modifies all forcing datasets via apply_operations() - Configures models with forcing via set_forcing() (if not random_forcing) Raises ------ ModelError If forcing operations or model configuration fails. Original exception is preserved via exception chaining. """ try: for f in self.forcing: f.apply_operations(self.params) if not self.random_forcing: for m, f in zip(self.model, self.forcing): m.set_forcing(forcing=f) except Exception as e: raise ModelError( f"Failed to setup forcing and models: {e}", is_initialized=False ) from e def _validate_and_set_objective_function( self, obj_func: str | Callable[[np.ndarray, np.ndarray], float] | None ) -> None: """ Validate and set the objective function. Parameters ---------- obj_func Objective function specification. Can be: - None: Use default non-parametric Kling-Gupta Efficiency - str: Name of HydroErr metric (e.g., 'nse', 'kge_2012') - callable: User-defined function with signature (observed, simulated) -> float Raises ------ ConfigurationError If obj_func is not None, string, or callable. """ if obj_func is None: logger.info("Objective function: Non parametric Kling-Gupta Efficiency") self.obj_func = None elif isinstance(obj_func, str): self.obj_func = obj_func elif callable(obj_func): self.obj_func = obj_func else: raise ConfigurationError( f"Objective function must be None, string, or callable, " f"got {type(obj_func).__name__}.", item_name="obj_func", item_value=obj_func, reason="Invalid objective function type", )
[docs] def parameters(self) -> Any: """ Generate random parameter sets that satisfy constraints. Returns ------- Any SPOTPY parameter object with valid random parameter values. Raises ------ RuntimeError If unable to generate valid parameters after 1000 attempts. """ x = None for i in range(1000): x = spotpy.parameter.generate(self.params_spotpy) names = [row[1] for row in x] values = [row[0] for row in x] params = self.params param_values = dict(zip(names, values)) params.set_values(param_values, check_range=False) if params.constraints_satisfied() and params.range_satisfied(): break if i >= 1000: raise ModelError("The parameter constraints could not be satisfied.") return x
[docs] def simulation(self, x: spotpy.parameter) -> list[np.ndarray] | None: """ Run model simulation with given parameter set. Parameters ---------- x SPOTPY parameter object containing parameter values. Returns ------- list[np.ndarray] | None List of simulated discharge time series (one per model), or None if simulation failed or constraints were violated. """ params = self.params param_values = dict(zip(x.name, x.random)) logger.debug(f"Setting {len(param_values)} parameter values for simulation") params.set_values(param_values) if not params.constraints_satisfied(): logger.debug("Parameter constraints not satisfied, skipping simulation") return None if not params.range_satisfied(): logger.debug("Parameter ranges not satisfied, skipping simulation") return None logger.debug(f"Running {len(self.model)} model(s)") all_sim = [] for model_idx, (model, forcing) in enumerate(zip(self.model, self.forcing)): logger.debug(f" Model {model_idx}: running simulation") if self.random_forcing: logger.debug(" Applying random forcing operations") forcing.apply_operations(params, apply_to_all=False) model.run(parameters=params, forcing=forcing) else: model.run(parameters=params) sim = model.get_outlet_discharge() if sim.size == 0: logger.warning(f" Model {model_idx}: no outlet discharge data") return None logger.debug(f" Model {model_idx}: extracted {len(sim)} discharge values") all_sim.append(sim[self.warmup :]) if self.dump_outputs or self.dump_forcing: now = datetime.now() date_time = now.strftime("%Y-%m-%d_%H%M%S") path = os.path.join(self.dump_dir, date_time) os.makedirs(path, exist_ok=True) if self.dump_outputs: logger.debug(f" Dumping outputs to {path}") model.dump_outputs(path) if self.dump_forcing: logger.debug(f" Saving forcing to {path}") forcing.save_as(os.path.join(path, f"forcing_{model_idx}.nc")) logger.debug(f"Simulation complete: {len(all_sim)} models processed") return all_sim
[docs] def evaluation(self) -> list[np.ndarray]: all_obs = [] for obs in self.obs: all_obs.append(obs[self.warmup :]) return all_obs
[docs] def objectivefunction( self, simulation: list[np.ndarray], evaluation: list[np.ndarray], params: spotpy.parameter | None = None, ) -> float: if simulation is None: logger.error(f"Simulation results empty (params: {params}).") if self.invert_obj_func: return np.inf return -np.inf all_like = [] for sim, obs in zip(simulation, evaluation): if sim is None or sim.size == 0: logger.error(f"Simulation results empty (params: {params}).") like = -np.inf elif len(sim) != len(obs): logger.error( f"Length of simulation and observation do not match " f"(params: {params})." ) like = -np.inf else: if not self.obj_func: like = spotpy.objectivefunctions.kge_non_parametric(obs, sim) elif isinstance(self.obj_func, str): like = evaluate(sim, obs, self.obj_func) else: like = self.obj_func(obs, sim) if self.invert_obj_func: like = -like all_like.append(like) # Return the mean of the objective function return np.mean(all_like)
def evaluate(simulation: np.array, observation: np.array, metric: str) -> float: """ Evaluate the simulation using the provided metric (goodness of fit). Parameters ---------- simulation The predicted time series. observation The time series of the observations with dates matching the simulated series. metric The abbreviation of the function as defined in HydroErr (https://hydroerr.readthedocs.io/en/stable/list_of_metrics.html) Examples: nse, kge_2012, ... Returns ------- The value of the selected metric. """ eval_fct = getattr(importlib.import_module("HydroErr"), metric) return eval_fct(simulation, observation)