Source code for hydrobricks.models.model

from __future__ import annotations

import logging
import os
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any

import HydroErr
import numpy as np

from hydrobricks._exceptions import ConfigurationError, ModelError
from hydrobricks._hydrobricks import ModelHydro, close_log, init_log
from hydrobricks._utils import Timer, date_as_mjd, dump_config_file, validate_kwargs
from hydrobricks.actions.action import Action
from hydrobricks.forcing import Forcing
from hydrobricks.hydro_units import HydroUnits
from hydrobricks.models.model_settings import ModelSettings
from hydrobricks.parameters import ParameterSet
from hydrobricks.trainer import evaluate

if TYPE_CHECKING:
    from pathlib import Path

logger = logging.getLogger(__name__)


[docs] class Model(ABC): """Base abstract class for hydrological models""" @abstractmethod def __init__(self, name: str | None = None, **kwargs: Any) -> None: """ Initialize the Model base class. Parameters ---------- name Name identifier for the model instance. Default: None **kwargs Additional keyword arguments for model configuration. Allowed keys: 'solver', 'record_all', 'land_cover_types', 'land_cover_names' Raises ------ TypeError If unsupported keyword arguments are provided. """ self.name: str | None = name self.model: ModelHydro = ModelHydro() self.spatial_structure: HydroUnits | None = None self.allowed_kwargs: set[str] = { "solver", "record_all", "land_cover_types", "land_cover_names", } self._is_initialized: bool = False # Default options self.options: dict[str, Any] = dict() self.solver: str = "heun_explicit" self.record_all: bool = False self.land_cover_types: list[str] = ["ground"] self.land_cover_names: list[str] = ["ground"] self.allowed_land_cover_types: list[str] = ["ground"] self._set_basic_options(kwargs) # Structure self.structure: dict[str, Any] = dict() self.parameter_aliases: dict[str, str] = dict() self.parameter_constraints: list[tuple[str, ...]] = [] # Setting base settings self.settings: ModelSettings = ModelSettings( solver=self.solver, record_all=self.record_all ) def __del__(self) -> None: """Clean up resources when model is deleted.""" self._cleanup() @property def name(self) -> str | None: """Get the model name.""" return self._name @name.setter def name(self, name: str | None) -> None: """Set the model name.""" self._name = name
[docs] def setup( self, spatial_structure: HydroUnits, output_path: str | Path, start_date: str, end_date: str | None = None, ) -> None: """ Setup and initialize the model for simulation. Parameters ---------- spatial_structure The spatial structure of the catchment (hydro units). output_path Path to directory where results will be saved. start_date Starting date of the computation (format: 'YYYY-MM-DD'). end_date Ending date of the computation (format: 'YYYY-MM-DD'). If None, uses last date in time series. Default: None Raises ------ RuntimeError If the model has already been initialized. TypeError If arguments have incorrect types. FileNotFoundError If the output path cannot be created. Examples -------- >>> model = SomeModel() >>> model.setup(hydro_units, './output', '2020-01-01', '2020-12-31') """ if self._is_initialized: raise ModelError( "The model has already been initialized. Please create a new instance.", is_initialized=True, ) try: if isinstance(output_path, str) and not os.path.isdir(output_path): os.mkdir(output_path) self.spatial_structure = spatial_structure # Initialize log init_log(str(output_path)) # Modelling period self.settings.set_timer(start_date, end_date, 1, "day") # Initialize the model (with sub basin creation) self.model.init_with_basin( self.settings.settings, spatial_structure.settings ) self._is_initialized = True except ModelError: logger.error("Model initialization failed", exc_info=True) raise except OSError as e: logger.error( f"Error creating output directory {output_path}: {e}", exc_info=True ) raise ModelError(f"Failed to create output directory: {e}") from e except (TypeError, ValueError) as e: logger.error( f"Invalid argument type or value in model setup: {e}", exc_info=True ) raise ConfigurationError(f"Invalid configuration: {e}") from e
[docs] def run(self, parameters: ParameterSet, forcing: Forcing | None = None) -> None: """ Setup and run the model. Parameters ---------- parameters The parameters for the given model. forcing The forcing data. """ logger.debug(f"Running model: {self.name}") if not self._is_initialized: raise ModelError( "The model has not been initialized. Please run setup() first.", is_initialized=False, ) if not parameters.is_valid(): undefined = parameters.get_undefined() logger.debug(f"Invalid parameters: {undefined}") raise ConfigurationError( f"Some parameters were not defined: " f'{",".join(undefined)}.' ) try: logger.debug("Resetting model state") self.model.reset() if forcing is not None and not forcing.is_initialized(): logger.debug("Applying forcing operations") forcing.apply_operations(parameters) logger.debug("Setting parameter values") self._set_parameter_values(parameters) logger.debug("Setting forcing data") self._set_forcing(forcing) if not self.model.is_valid(): raise ConfigurationError("The model is not properly configured.") logger.debug("Starting model simulation") timer = Timer(text="Model simulation completed in {seconds:.2f} seconds") timer.start() self.model.run() timer.stop() except ModelError: logger.error("Model execution failed", exc_info=True) raise except ConfigurationError: logger.error("Configuration error during model run", exc_info=True) raise except (TypeError, ValueError) as e: logger.error(f"Model run failed: {e}", exc_info=True) raise ModelError(f"Model run failed: {e}") from e
@staticmethod def _cleanup() -> None: close_log()
[docs] def initialize_state_variables( self, parameters: ParameterSet, forcing: Forcing | None = None ) -> None: """ Run the model and save the state variables as initial values. Parameters ---------- parameters The parameters for the given model. forcing The forcing data. """ self.run(parameters, forcing) self.model.save_as_initial_state()
[docs] def set_forcing(self, forcing: Forcing) -> None: """ Set the forcing data. Parameters ---------- forcing The forcing data. """ if not self._is_initialized: raise ModelError( "The model has not been initialized. " "Please run setup() before setting forcing data.", is_initialized=False, ) self.model.clear_time_series() time = forcing.data2D.time.to_numpy() time = date_as_mjd(time) ids = self.spatial_structure.get_ids().to_numpy().flatten() for data_name, data in zip(forcing.data2D.data_name, forcing.data2D.data): data_name = str(data_name) if data is None: raise ModelError( f"The forcing {data_name} has not been spatialized.", is_initialized=True, ) if not self.model.create_time_series(data_name, time, ids, data): raise ModelError("Failed adding time series.") if not self.model.attach_time_series_to_hydro_units(): raise ModelError("Attaching time series failed.")
[docs] def add_action(self, action: Action) -> bool: """ Add an action to the model. Parameters ---------- action The action object. The dates must be sorted. """ if not action.is_initialized: raise ModelError(f"The action {action.name} has not been initialized.") if not self._is_initialized: raise ModelError( "The model has not been initialized. " "Please run setup() before adding actions.", is_initialized=False, ) return self.model.add_action(action.action)
[docs] def get_action_count(self) -> int: """ Get the number of actions (types of actions) registered in the model. """ return self.model.get_action_count()
[docs] def get_sporadic_action_item_count(self) -> int: """ Get the number of sporadic (non-recursive) action items (individual triggers) registered in the model. """ return self.model.get_sporadic_action_item_count()
[docs] def create_config_file( self, directory: str, name: str, file_type: str = "both" ) -> None: """ Create a configuration file describing the model structure. Such a file can be used when using the command-line version of hydrobricks. It contains the options to generate the corresponding model structure. Parameters ---------- directory The directory to write the file. name The name of the generated file. file_type The type of file to generate: 'json', 'yaml', or 'both'. """ settings = { "base": self.name, "solver": self.solver, "options": self.options, "land_covers": { "names": self.land_cover_names, "types": self.land_cover_types, }, "logger": "all" if self.record_all else "", } dump_config_file(settings, directory, name, file_type)
[docs] def get_outlet_discharge(self) -> np.ndarray: """ Get the computed outlet discharge. """ return self.model.get_outlet_discharge()
[docs] def get_total_outlet_discharge(self) -> float: """ Get the outlet discharge total. """ return self.model.get_total_outlet_discharge()
[docs] def get_total_et(self) -> float: """ Get the total amount of water lost by evapotranspiration. """ return self.model.get_total_et()
[docs] def get_total_water_storage_changes(self) -> float: """ Get the total change in water storage. """ return self.model.get_total_water_storage_changes()
[docs] def get_total_snow_storage_changes(self) -> float: """ Get the total change in snow storage. """ return self.model.get_total_snow_storage_changes()
[docs] def dump_outputs(self, path: str) -> None: """ Write the model outputs to a netcdf file. Parameters ---------- path Path to the target file. """ self.model.dump_outputs(path)
[docs] def eval(self, metric: str, observations: np.ndarray, warmup: int = 0) -> float: """ Evaluate the simulation using the provided metric (goodness of fit). Parameters ---------- metric The abbreviation of the function as defined in HydroErr (https://hydroerr.readthedocs.io/en/stable/list_of_metrics.html) Examples: nse, kge_2012, ... observations The time series of the observations with dates matching the simulated series. warmup The number of days of warmup period. This option is used to discard the warmup period from the evaluation. It is useful when conducting a run with a specific parameter set and comparing its score with those from the calibration. By setting the 'warmup' value, you can ensure fair assessments by discarding outputs from the specified warmup period (as is done automatically during calibration). Returns ------- The value of the selected metric. """ return evaluate( self.get_outlet_discharge()[warmup:], observations[warmup:], metric )
[docs] def generate_parameters(self) -> ParameterSet: """ Generate a ParameterSet for the model based on its structure. Automatically creates parameter definitions based on the model structure, applies any defined aliases, and applies any defined constraints. Returns ------- ParameterSet A ParameterSet object with all model parameters defined. """ ps = ParameterSet() ps.generate_parameters( self.land_cover_types, self.land_cover_names, self.options, self.structure ) for alias_key, alias_value in self.parameter_aliases.items(): if ps.has(alias_key): ps.add_aliases(alias_key, alias_value) for constraint in self.parameter_constraints: ps.define_constraint(*constraint) return ps
@abstractmethod def _define_structure(self) -> None: raise ConfigurationError( f"The structure has to be defined by the child class (named {self.name}).", reason="Abstract method not implemented", ) @abstractmethod def _set_specific_options(self, kwargs: dict[str, Any]) -> None: raise ConfigurationError( f"The specific options have to be defined by " f"the child class (named {self.name}).", reason="Abstract method not implemented", ) def _set_options(self, kwargs: dict[str, Any]) -> None: """ Internal method to configure model options. Processes keyword arguments, validates them against allowed options, applies basic and specific options, and validates cover types. Parameters ---------- kwargs Keyword arguments containing model configuration options. Raises ------ RuntimeError If land cover names and types don't match or invalid types are provided. """ self._add_allowed_kwargs(self.options.keys()) self._validate_kwargs(kwargs) for key, value in kwargs.items(): if key in ["solver", "record_all", "land_cover_types", "land_cover_names"]: continue self.options[key] = value self._set_specific_options(kwargs) self._check_cover_types() def _check_cover_types(self) -> None: """ Validate that land cover names and types are consistent. Verifies that: - Names and types lists have same length - All cover types are in the allowed list for this model Raises ------ RuntimeError If validation fails. """ if len(self.land_cover_names) != len(self.land_cover_types): raise ConfigurationError( "The length of the land cover names and types do not match.", reason="Mismatched array sizes", ) # Check allowed land cover types: ground, glacier for cover_type in self.land_cover_types: if cover_type not in self.allowed_land_cover_types: raise ConfigurationError( f"The land cover {cover_type} is not used in Socont", item_name="land_cover_types", item_value=cover_type, reason="Invalid land cover type", ) def _set_basic_options(self, kwargs: dict[str, Any]) -> None: """ Set basic solver and model configuration options from kwargs. Extracts and applies the following options if present: - 'solver': Numerical solver name - 'record_all': Whether to record all state/flux values - 'land_cover_types': List of land cover types - 'land_cover_names': List of land cover names Parameters ---------- kwargs Keyword arguments containing basic options. """ if "solver" in kwargs: self.solver = kwargs["solver"] if "record_all" in kwargs: self.record_all = kwargs["record_all"] if "land_cover_types" in kwargs: self.land_cover_types = kwargs["land_cover_types"] if "land_cover_names" in kwargs: self.land_cover_names = kwargs["land_cover_names"] def _add_allowed_kwargs(self, kwargs: Any) -> None: """ Add keys to the set of allowed keyword arguments. Parameters ---------- kwargs Keys (as iterable) to add to allowed_kwargs set. """ self.allowed_kwargs.update(kwargs) def _validate_kwargs(self, kwargs: dict[str, Any]) -> None: """ Validate that all provided keyword arguments are allowed. Parameters ---------- kwargs Keyword arguments to validate against allowed_kwargs. Raises ------ TypeError If any kwargs are not in the allowed set. """ # Validate optional keyword arguments. validate_kwargs(kwargs, self.allowed_kwargs) def _generate_structure(self) -> None: """ Generate the complete model structure. Creates all bricks (land covers, storages, etc.) and their processes based on the model's structure definition. This includes: - Creating basic structure elements - Adding all defined bricks - Adding brick parameters - Adding brick processes - Setting up logging Raises ------ RuntimeError If structure creation fails. """ # Generate basic elements self._set_structure_basics() # Generate the structure for key, brick in self.structure.items(): # Select or add the brick self._set_structure_brick(brick, key) # Add brick parameters if any if "parameters" in brick: for param, value in brick["parameters"].items(): self.settings.add_brick_parameter(param, value) # Add brick processes if any if "processes" in brick: for process, process_data in brick["processes"].items(): self._set_structure_process(key, process, process_data) self.settings.add_logging_to("outlet") def _set_structure_basics(self) -> None: """ Generate basic model structure elements. Sets up the fundamental structure including precipitation splitting, land covers, and snowpack processes based on model options. This method extracts relevant options and calls settings methods to generate the base structure. """ with_snow = True snow_melt_process = "melt:degree_day" snow_ice_transformation = None snow_redistribution = None if "with_snow" in self.options: with_snow = self.options["with_snow"] if "snow_melt_process" in self.options: with_snow = True snow_melt_process = self.options["snow_melt_process"] if "snow_ice_transformation" in self.options: snow_ice_transformation = self.options["snow_ice_transformation"] if "snow_redistribution" in self.options: snow_redistribution = self.options["snow_redistribution"] self.settings.generate_base_structure( self.land_cover_names, self.land_cover_types, with_snow=with_snow, snow_melt_process=snow_melt_process, snow_ice_transformation=snow_ice_transformation, snow_redistribution=snow_redistribution, ) def _set_structure_brick(self, brick: dict[str, Any], key: str) -> None: """ Add or select a brick in the model structure. Parameters ---------- brick Brick definition dictionary containing 'kind' and 'attach_to' keys. key Name/identifier for the brick. Raises ------ RuntimeError If brick has an invalid 'attach_to' value. """ if brick["kind"] == "land_cover": self.settings.select_hydro_unit_brick(key) else: if brick["attach_to"] == "hydro_unit": self.settings.add_hydro_unit_brick(key, brick["kind"]) elif brick["attach_to"] == "sub_basin": self.settings.add_sub_basin_brick(key, brick["kind"]) else: raise ConfigurationError( f'Brick {key} has an invalid "attach_to" value.', item_name="attach_to", item_value=brick.get("attach_to"), reason="Invalid value", ) def _set_structure_process( self, key: str, process: str, process_data: dict[str, Any] ) -> None: """ Add a process to a brick in the model structure. Parameters ---------- key Name of the brick containing this process. process Name/identifier for the process. process_data Process definition dictionary containing 'kind', 'target', and optional 'log' and 'instantaneous' keys. Raises ------ RuntimeError If process lacks a required target (unless it's an ET process). """ instantaneous = False if "instantaneous" in process_data: instantaneous = process_data["instantaneous"] log = False if "log" in process_data: log = process_data["log"] target = "" if "target" in process_data: target = process_data["target"] else: kind = process_data["kind"] if not (kind.startswith("et:") or kind.startswith("interception:")): raise ConfigurationError( f"Brick {key} has a process ({process}) without a target.", item_name="target", reason="Missing required target", ) self.settings.add_brick_process( process, process_data["kind"], target, log=log, instantaneous=instantaneous ) def _set_parameter_values(self, parameters: ParameterSet) -> None: """ Apply parameter values to the model. Iterates through model parameters and sets them in the model settings, then updates the underlying C++ model with the new parameters. Parameters ---------- parameters ParameterSet containing the parameter values to apply. Raises ------ RuntimeError If setting parameter values fails. """ model_params = parameters.get_model_parameters() for _, param in model_params.iterrows(): if not self.settings.set_parameter_value( param["component"], param["name"], param["value"] ): raise ModelError("Failed setting parameter values.") self.model.update_parameters(self.settings.settings) def _set_forcing(self, forcing: Forcing | None) -> None: """ Attach forcing data to the model. If forcing is provided, spatializes it and attaches it to hydro units. If no forcing is provided, verifies that forcing was previously loaded. Parameters ---------- forcing Forcing object with meteorological data, or None if already set. Raises ------ RuntimeError If no forcing data is available. """ if forcing is not None: self.set_forcing(forcing) elif not self.model.forcing_loaded(): raise ModelError( "Please provide the forcing data at least once.", is_initialized=False )