Source code for hydrobricks.forcing

from __future__ import annotations

import logging
from enum import auto
from pathlib import Path
from typing import TYPE_CHECKING, Any

import numpy as np
import pandas as pd
from cftime import num2date

from hydrobricks import Dataset, pyet
from hydrobricks._constants import TO_RAD
from hydrobricks._exceptions import (
    ConfigurationError,
    DataError,
    DependencyError,
    ForcingError,
)
from hydrobricks._optional import HAS_NETCDF, HAS_PYET, StrEnumClass
from hydrobricks.parameters import ParameterSet
from hydrobricks.time_series import TimeSeries1D, TimeSeries2D

logger = logging.getLogger(__name__)

# Maps canonical PET method names (and their short aliases) to pyet function names.
_PET_METHOD_MAP: dict[str, str] = {
    "Penman": "penman",
    "penman": "penman",
    "Penman-Monteith": "pm",
    "pm": "pm",
    "ASCE-PM": "pm_asce",
    "pm_asce": "pm_asce",
    "FAO-56": "pm_fao56",
    "pm_fao56": "pm_fao56",
    "Priestley-Taylor": "priestley_taylor",
    "priestley_taylor": "priestley_taylor",
    "Kimberly-Penman": "kimberly_penman",
    "kimberly_penman": "kimberly_penman",
    "Thom-Oliver": "thom_oliver",
    "thom_oliver": "thom_oliver",
    "Blaney-Criddle": "blaney_criddle",
    "blaney_criddle": "blaney_criddle",
    "Hamon": "hamon",
    "hamon": "hamon",
    "Romanenko": "romanenko",
    "romanenko": "romanenko",
    "Linacre": "linacre",
    "linacre": "linacre",
    "Haude": "haude",
    "haude": "haude",
    "Turc": "turc",
    "turc": "turc",
    "Jensen-Haise": "jensen_haise",
    "jensen_haise": "jensen_haise",
    "McGuinness-Bordne": "mcguinness_bordne",
    "mcguinness_bordne": "mcguinness_bordne",
    "Hargreaves": "hargreaves",
    "hargreaves": "hargreaves",
    "FAO-24 radiation": "fao_24",
    "fao_24": "fao_24",
    "Abtew": "abtew",
    "abtew": "abtew",
    "Makkink": "makkink",
    "makkink": "makkink",
    "Oudin": "oudin",
    "oudin": "oudin",
}

if TYPE_CHECKING:
    from hydrobricks.catchment import Catchment
    from hydrobricks.hydro_units import HydroUnits


[docs] class Forcing: """Class for managing forcing (meteorological) data for hydrological models."""
[docs] class Variable(StrEnumClass): """Enumeration of supported meteorological variables.""" P = auto() # Precipitation [mm] T = auto() # Temperature [°C] T_MIN = auto() # Minimum temperature [°C] T_MAX = auto() # Maximum temperature [°C] T_DEW_POINT = auto() # Dew point temperature [°C] PET = auto() # Potential evapotranspiration [mm] RH = auto() # Relative humidity [%] RH_MIN = auto() # Minimum relative humidity [%] RH_MAX = auto() # Maximum relative humidity [%] R_NET = auto() # Net radiation [MJ m-2 d-1] R_SOLAR = auto() # Solar radiation [MJ m-2 d-1] SD = auto() # Sunshine duration [h] WIND = auto() # Wind speed [m s-1] PRESSURE = auto() # Atmospheric pressure [kPa]
def __init__(self, spatial_entity: HydroUnits | Catchment) -> None: """ Initialize Forcing object for a spatial entity. Parameters ---------- spatial_entity Either a HydroUnits or Catchment object defining the spatial structure. Raises ------ TypeError If spatial_entity is not HydroUnits or Catchment. ValueError If the hydro_units object is empty. """ # Import here to avoid circular imports from hydrobricks.catchment import Catchment from hydrobricks.hydro_units import HydroUnits if isinstance(spatial_entity, HydroUnits): hydro_units = spatial_entity catchment = None elif isinstance(spatial_entity, Catchment): hydro_units = spatial_entity.hydro_units catchment = spatial_entity else: raise ConfigurationError( f"The spatial_entity argument must be a HydroUnits or " f"Catchment object, not {type(spatial_entity)}.", item_name="spatial_entity", item_value=type(spatial_entity).__name__, reason="Invalid type", ) # Check hydro units if len(hydro_units.hydro_units) == 0: raise ConfigurationError( "The hydro_units object contains no hydrological units.", reason="Empty hydro units", ) super().__init__() self.data1D: TimeSeries1D = TimeSeries1D() self.data2D: TimeSeries2D = TimeSeries2D() self.catchment: Catchment | None = catchment self.hydro_units: pd.DataFrame = hydro_units.hydro_units self._operations: list[dict[str, Any]] = [] self._is_initialized: bool = False
[docs] def is_initialized(self) -> bool: """ Check if the forcing has been initialized. Returns ------- bool True if the forcing has been initialized, False otherwise. """ return self._is_initialized
[docs] def get_variable_enum(self, variable: str) -> Variable: """ Match a variable name string to the corresponding Variable enum value. Parameters ---------- variable Variable name or alias (e.g., 'precipitation', 'precip', 'p', 'P'). Returns ------- Variable The corresponding Variable enum value. Raises ------ ValueError If the variable name is not recognized. Examples -------- >>> forcing = Forcing(hydro_units) >>> var = forcing.get_variable_enum('precip') >>> var == Forcing.Variable.P True """ if variable in self.Variable.__members__: return self.Variable[variable] # Dictionary mapping variable aliases to Variable enum variable_aliases = { "precipitation": self.Variable.P, "precip": self.Variable.P, "p": self.Variable.P, "temperature": self.Variable.T, "temp": self.Variable.T, "t": self.Variable.T, "temperature_min": self.Variable.T_MIN, "min_temperature": self.Variable.T_MIN, "t_min": self.Variable.T_MIN, "tmin": self.Variable.T_MIN, "temperature_max": self.Variable.T_MAX, "max_temperature": self.Variable.T_MAX, "t_max": self.Variable.T_MAX, "tmax": self.Variable.T_MAX, "pet": self.Variable.PET, "relative_humidity": self.Variable.RH, "rh": self.Variable.RH, "relative_humidity_min": self.Variable.RH_MIN, "min_relative_humidity": self.Variable.RH_MIN, "rh_min": self.Variable.RH_MIN, "rhmin": self.Variable.RH_MIN, "relative_humidity_max": self.Variable.RH_MAX, "max_relative_humidity": self.Variable.RH_MAX, "rh_max": self.Variable.RH_MAX, "rhmax": self.Variable.RH_MAX, "net_radiation": self.Variable.R_NET, "r_net": self.Variable.R_NET, "r_n": self.Variable.R_NET, "rn": self.Variable.R_NET, "solar_radiation": self.Variable.R_SOLAR, "r_solar": self.Variable.R_SOLAR, "r_s": self.Variable.R_SOLAR, "rs": self.Variable.R_SOLAR, "sunshine_duration": self.Variable.SD, "sd": self.Variable.SD, "wind_speed": self.Variable.WIND, "wind": self.Variable.WIND, "pressure": self.Variable.PRESSURE, } if variable in variable_aliases: logger.debug( f"Resolved variable alias '{variable}' to {variable_aliases[variable]}" ) return variable_aliases[variable] raise ForcingError(f"Variable {variable} is not recognized.", variable=variable)
def _can_be_negative(self, variable: Variable) -> bool: """ Check if a given variable can have negative values. Parameters ---------- variable The Variable to check. Returns ------- bool True if the variable can be negative, False otherwise. Raises ------ ValueError If the variable is not recognized. """ if variable in [ self.Variable.P, self.Variable.PET, self.Variable.RH, self.Variable.RH_MIN, self.Variable.RH_MAX, self.Variable.R_NET, self.Variable.R_SOLAR, self.Variable.SD, self.Variable.WIND, self.Variable.PRESSURE, ]: return False elif variable in [ self.Variable.T, self.Variable.T_MIN, self.Variable.T_MAX, self.Variable.T_DEW_POINT, ]: return True else: raise ForcingError( f"Undefined if variable {variable} can be negative.", variable=variable )
[docs] def load_station_data_from_csv( self, path: str | Path, column_time: str, time_format: str, content: dict[str, str] | None = None, ) -> None: """ Read 1D time series data from CSV file for a single station. Parameters ---------- path Path to the CSV file containing station data. column_time Column name containing the time values. time_format Format string for parsing time values (e.g., '%Y-%m-%d'). content Dictionary mapping variable names/aliases to CSV column names. Example: {'precipitation': 'Precipitation (mm)', 'temperature': 'Temp (C)'} Default: None Raises ------ FileNotFoundError If the CSV file does not exist. KeyError If required columns are not found in the CSV file. Examples -------- >>> forcing.load_station_data_from_csv( ... 'weather.csv', ... 'Date', ... '%Y-%m-%d', ... {'precipitation': 'P (mm)', 'temperature': 'T (C)'} ... ) """ # Change the variable names (key) to the enum corresponding values for key in list(content.keys()): enum_val = self.get_variable_enum(key) content[enum_val] = content.pop(key) self.data1D.load_from_csv(path, column_time, time_format, content)
[docs] def correct_station_data(self, **kwargs: Any) -> None: """ Define prior correction operations to apply to forcing data. Parameters ---------- **kwargs Keyword arguments defining the correction operation. Required keys: - variable : str Name of the variable to correct. - method : str Correction method: 'additive' or 'multiplicative' - correction_factor : float Value of the correction factor (to add or multiply). Examples -------- >>> forcing.correct_station_data( ... variable='temperature', ... method='additive', ... correction_factor=0.5 # Add 0.5 degrees ... ) """ kwargs["type"] = "prior_correction" self._operations.append(kwargs)
[docs] def spatialize_from_station_data(self, **kwargs: Any) -> None: """ Define the spatialization operations from station data to all hydro units. Parameters ---------- variable : str Name of the variable to spatialize. method : str Name of the method to use. Can be: * constant: the same value will be used * additive_elevation_gradient: use of an additive elevation gradient that is either constant or changes for every month. Parameters: 'ref_elevation', 'gradient'. * multiplicative_elevation_gradient: use of a multiplicative elevation gradient that is either constant or changes for every month. Parameters: 'ref_elevation', 'gradient'. * multiplicative_elevation_threshold_gradients: same as multiplicative_elevation_gradient, but with an elevation threshold with a gradient below and a gradient above. Parameters: 'ref_elevation', 'gradient', 'gradient_2', 'elevation_threshold' ref_elevation : float Reference (station) elevation. For method(s): 'elevation_gradient' gradient : float/list Gradient of the variable to apply per 100m (e.g., °C/100m). Can be a unique value or a list providing a value for every month. For method(s): 'elevation_gradient', 'elevation_multi_gradients' gradient_1 : float/list Same as parameter 'gradient' gradient_2 : float/list Gradient of the variable to apply per 100m (e.g., °C/100m) for the units above the elevation threshold defined by 'elevation_threshold'. For method(s): 'elevation_multi_gradients' elevation_threshold : int/float Threshold elevation to switch from gradient to gradient_2. For method(s): 'elevation_multi_gradients' """ kwargs["type"] = "spatialize_from_station" self._operations.append(kwargs)
[docs] def spatialize_from_gridded_data(self, **kwargs: Any) -> None: """ Define the spatialization operations from gridded data to all hydro units. Parameters ---------- variable : str Name of the variable to spatialize. method : str Name of the method to use. Can be: * regrid_from_netcdf: regrid data from a single or multiple netCDF files. path : str|Path Path to the file containing the data or to a folder containing multiple files. file_pattern : str, optional Pattern of the files to read. If None, the path is considered to be a single file. data_crs : int, optional CRS (as EPSG id) of the data file. If None, the CRS is read from the file. var_name : str Name of the variable to read. dim_time : str Name of the time dimension. dim_x : str Name of the x dimension. dim_y : str Name of the y dimension. raster_hydro_units : str|Path Path to a raster containing the hydro unit ids to use for the spatialization. apply_data_gradient : bool, optional If True, elevation-based gradients will be retrieved from the data and applied to the hydro units (e.g., for temperature and precipitation). If False, the data will be regridded without applying any gradient. Default is True for temperature and precipitation variables, and False for other variables. """ kwargs["type"] = "spatialize_from_grid" self._operations.append(kwargs)
[docs] def compute_pet(self, **kwargs: Any) -> None: """ Define the PET computation operations using the pyet library. The PET will be computed for all hydro units. Parameters ---------- method : str Name of the method to use. Possible values are those provided in the table from the pyet documentation: https://pypi.org/project/pyet/. The method name or the pyet function name can be used. use : list List of the meteorological variables to use to compute the PET. Only the variables listed here will be used. The variables must be named according to pyet naming convention (see pyet API documentation: https://pyet.readthedocs.io/en/latest/api/index.html). These variables must be available (data loaded in forcing) and spatialized. Example: use=['t', 'tmin', 'tmax', 'lat', 'elevation'] lat : float, optional Latitude of the catchment. If not provided, the latitude computed for each hydro unit will be used. other options : see pyet documentation for function-specific options. These options will be passed to the pyet function. Raises ------ DependencyError If pyet is not installed. ValueError If required variables are not available. """ if not HAS_PYET: raise DependencyError( "PyEt is required for PET computation.", package_name="pyet", operation="Forcing.compute_pet", install_command="pip install pyet", ) kwargs["type"] = "compute_pet" self._operations.append(kwargs)
[docs] def apply_operations( self, parameters: ParameterSet | None = None, apply_to_all: bool = True ) -> None: """ Apply the pre-defined operations. Executes all spatialization, correction, and PET computation operations that were previously defined. Operations are applied in a fixed order: 1. Prior corrections 2. Station data spatialization 3. Gridded data spatialization 4. PET computation Parameters ---------- parameters The parameter object instance. Required if operations reference parameters using the 'param:' prefix. apply_to_all If True, the operations will be applied to all variables. If False, the operations will only be applied to the variables related to parameters defined in the parameters.allow_changing list. This is useful to avoid re-applying, during the calibration phase, operations that have already been applied previously. Default: True Raises ------ ValueError If operations reference parameters but no parameter object is provided. """ logger.debug( f"Applying forcing operations: apply_to_all={apply_to_all}, " f"parameters={'provided' if parameters else 'None'}" ) # The operations will be applied in the order defined in the list operation_types = [ "prior_correction", "spatialize_from_station", "spatialize_from_grid", "compute_pet", ] for operation_type in operation_types: logger.debug(f" Applying {operation_type} operations") self._apply_operations_of_type(operation_type, parameters, apply_to_all) logger.debug("All forcing operations completed successfully") self._is_initialized = True
[docs] def save_as(self, path: str | Path, max_compression: bool = False) -> None: """ Create a netCDF file with the forcing data. Saves the 2D spatialized forcing data to a netCDF4 file with the structure suitable for later loading with load_from(). Parameters ---------- path Path of the file to create. max_compression Option to allow maximum compression for data in file. When True, uses compression with least_significant_digit=3 for better storage efficiency. Default: False Raises ------ ImportError If netcdf4 is not installed. Notes ----- If apply_operations() has not been called, it will be called automatically before saving to ensure data is properly spatialized. """ if not HAS_NETCDF: raise ImportError("netcdf4 is required to do this.") if not self.is_initialized(): logger.info("Applying operations before saving...") self.apply_operations() self._is_initialized = True time = self.data2D.get_dates_as_mjd() # Create netCDF file using context manager for proper resource handling logger.debug(f"Creating netCDF file: {path}") with Dataset(path, "w", "NETCDF4") as nc: # Dimensions nc.createDimension("hydro_units", len(self.hydro_units)) nc.createDimension("time", len(time)) # Variables var_id = nc.createVariable("id", "int", ("hydro_units",)) var_id[:] = self.hydro_units["id"] var_time = nc.createVariable("time", "float32", ("time",)) var_time[:] = time var_time.units = "days since 1858-11-17 00:00:00" var_time.comment = "Modified Julian Day Number" for idx, variable in enumerate(self.data2D.data_name): if max_compression: var_data = nc.createVariable( variable, "float32", ("time", "hydro_units"), zlib=True, least_significant_digit=3, ) else: var_data = nc.createVariable( variable, "float32", ("time", "hydro_units"), zlib=True ) var_data[:, :] = self.data2D.data[idx] logger.debug("NetCDF file created successfully")
[docs] def load_from(self, path: str | Path) -> None: """ Load data from a netCDF file created using save_as(). Reads a previously saved netCDF file containing spatialized forcing data and loads it into the Forcing object's data2D structure. Parameters ---------- path Path of the file to read. Raises ------ ImportError If netcdf4 is not installed. ValueError If the hydro units in the file don't match the Forcing object's hydro units. Notes ----- The loaded data will have the same structure as if it was created through spatialization operations. The Forcing object will be marked as initialized after successful loading. """ if not HAS_NETCDF: raise ImportError("netcdf4 is required to do this.") # Open and read netCDF file using context manager for proper resource handling logger.debug(f"Loading forcing data from netCDF file: {path}") with Dataset(path, "r", "NETCDF4") as nc: # Check that hydro units are the same hydro_units_nc = nc.variables["id"][:] hydro_units_def = self.hydro_units[("id", "-")].values if not np.array_equal(hydro_units_nc, hydro_units_def): raise DataError( "The hydrological units in the netCDF file are not " "the same as those in the forcing object. The netCDF file " "contains hydrological units with ids: " f"{hydro_units_nc}. The model contains hydrological " f"units with ids: {hydro_units_def}.", data_type="netCDF hydro units", reason="ID mismatch", ) # Load time ts = num2date(nc.variables["time"][:], units=nc.variables["time"].units) self.data2D.time = [pd.Timestamp(dt.year, dt.month, dt.day) for dt in ts] self.data2D.time = pd.Series(self.data2D.time) # Load variable names self.data2D.data_name = [ self.get_variable_enum(var) for var in nc.variables if var not in ["id", "time"] ] # Load data self.data2D.data = [] for variable in self.data2D.data_name: self.data2D.data.append(nc.variables[variable][:]) logger.debug( f"Successfully loaded forcing data with " f"{len(self.data2D.data_name)} variables" )
[docs] def get_total_precipitation(self) -> float: """ Calculate the catchment-average total precipitation. Computes the weighted average precipitation across all hydro units based on their areas. Returns ------- float Total precipitation in mm (or original units if data is in different units). """ idx = self.data2D.data_name.index(self.Variable.P) data = self.data2D.data[idx].sum(axis=0) areas = self.hydro_units[("area", "m2")] tot_precip = data * areas.values / areas.sum() return tot_precip.sum()
def _apply_operations_of_type( self, operation_type: str, parameters: ParameterSet | None = None, apply_to_all: bool = True, ) -> None: """ Apply operations of a specific type in sequence. Iterates through operations of the given type and applies them with the appropriate parameters. Substitutes parameter references (param: prefix) with actual parameter values. Parameters ---------- operation_type Type of operations to apply: - 'prior_correction': Applies additive/multiplicative corr. to station data - 'spatialize_from_station': Converts point obs. to distributed values - 'spatialize_from_grid': Resamples gridded data to hydro units - 'compute_pet': Calculates potential evapotranspiration parameters Parameter object for substituting parameter references. Required if operations use 'param:' prefix. apply_to_all If False, only apply operations for variables in allow_changing list. Raises ------ ValueError If operation references parameters but no parameter object provided. """ # Dictionary mapping operation types to their handler methods operation_handlers = { "prior_correction": self._apply_prior_correction, "spatialize_from_station": self._apply_spatialization_from_station_data, "spatialize_from_grid": self._apply_spatialization_from_gridded_data, "compute_pet": self._apply_pet_computation, } for operation_ref in self._operations: operation = operation_ref.copy() if operation["type"] != operation_type: continue # Remove the type key operation.pop("type") # Extract the operation options apply_operation = False for key in operation: value = operation[key] if isinstance(value, str) and value.startswith("param:"): if parameters is None: raise ConfigurationError( "A parameters object must be provided " "to apply the operations as it is " f'required by the "{value}" option.', item_name="forcing", reason="Missing required parameter", ) parameter_name = value.replace("param:", "") operation[key] = parameters.get(parameter_name) if not apply_to_all: # Restrict to parameters that changed if parameter_name in parameters.allow_changing: apply_operation = True # Apply the operation (or not) if apply_to_all or apply_operation: if operation_type not in operation_handlers: raise ConfigurationError( f"Unknown operation type: {operation_type}", item_name="operation_type", item_value=operation_type, reason="Unknown operation", ) logger.debug(f"Applying {operation_type} operation") operation_handlers[operation_type](**operation) def _apply_prior_correction( self, variable: str, method: str = "multiplicative", **kwargs: Any ) -> None: """ Apply a prior correction to 1D station data. Modifies the loaded station data by applying an additive or multiplicative correction factor to a specific variable. Parameters ---------- variable Name or alias of the variable to correct. method Correction method: 'multiplicative' (default) or 'additive'. **kwargs Must contain 'correction_factor' key with the factor value. Raises ------ ValueError If correction_factor not provided or unknown method used. """ variable = self.get_variable_enum(variable) idx = self.data1D.data_name.index(variable) # Extract kwargs (None if not provided) correction_factor = kwargs.get("correction_factor", None) if correction_factor is None: raise ForcingError( "Correction factor not provided.", variable=str(variable), method=method ) if method == "multiplicative": correction_factor = kwargs["correction_factor"] # Do not use in-place mutation (*=) to avoid issues with read-only arrays self.data1D.data[idx] = self.data1D.data[idx] * correction_factor elif method == "additive": correction_factor = kwargs["correction_factor"] # Do not use in-place mutation (+=) to avoid issues with read-only arrays self.data1D.data[idx] = self.data1D.data[idx] + correction_factor else: raise ForcingError( f"Unknown correction method: {method}", variable=str(variable), method=method, ) def _apply_spatialization_from_station_data( self, variable: str, method: str = "default", **kwargs: Any ) -> None: """ Spatialize 1D station data to 2D hydro units using elevation gradients. Converts point observations from a station to distributed values across hydro units using elevation-dependent methods (additive or multiplicative gradients). Parameters ---------- variable Name or alias of the variable to spatialize. method Spatialization method: 'constant', 'additive_elevation_gradient', 'multiplicative_elevation_gradient', or 'multiplicative_elevation_threshold_gradients'. **kwargs Required/optional parameters depend on the method: - ref_elevation: Reference station elevation (required for gradient methods) - gradient: Elevation gradient value(s) - gradient_2: Second gradient for threshold method Raises ------ ValueError If required parameters are missing or invalid method specified. """ # Checking that the correction_factor option is not used here anymore if "correction_factor" in kwargs: raise ConfigurationError( "The correction_factor option is to be used only " "in the correct_station_data() method.", item_name="correction_factor", reason="Wrong method for this option", ) variable = self.get_variable_enum(variable) unit_values = np.zeros((len(self.data1D.time), len(self.hydro_units))) hydro_units = self.hydro_units.reset_index() idx_1d = self.data1D.data_name.index(variable) data_raw = self.data1D.data[idx_1d].copy() # Specify default methods if method == "default": if variable == self.Variable.T: method = "additive_elevation_gradient" elif variable == self.Variable.P: method = "multiplicative_elevation_gradient" elif variable == self.Variable.PET: method = "constant" else: raise ForcingError( f"Unknown default method for variable: {variable}", variable=str(variable), method="spatialization", ) # Extract kwargs (None if not provided) ref_elevation = kwargs.get("ref_elevation", None) gradient = kwargs.get("gradient", None) # Check inputs if gradient is None: gradient = kwargs.get("gradient_1", None) if isinstance(gradient, list): if len(gradient) not in [1, 12]: raise ConfigurationError( f"The gradient should have a length of 1 or 12. " f"Here: {len(gradient)}", item_name="gradient", item_value=len(gradient), reason="Invalid length", ) # Apply methods for i_unit, (_, unit) in enumerate(hydro_units.iterrows()): elevation = unit["elevation"].values if method == "constant": unit_values[:, i_unit] = data_raw elif method == "additive_elevation_gradient": if ref_elevation is None: raise ConfigurationError( "Reference elevation not provided.", item_name="ref_elevation", reason="Missing required parameter", ) if ( isinstance(gradient, float) or isinstance(gradient, list) and len(gradient) == 1 ): unit_values[:, i_unit] = ( data_raw + gradient * (elevation - ref_elevation) / 100 ) elif isinstance(gradient, list) and len(gradient) == 12: for m in range(12): month = self.data1D.time.dt.month == m + 1 month = month.to_numpy() unit_values[month, i_unit] = ( data_raw[month] + gradient[m] * (elevation - ref_elevation) / 100 ) else: raise ConfigurationError( f"Wrong gradient format: {gradient}", item_name="gradient", item_value=gradient, reason="Invalid format", ) elif method == "multiplicative_elevation_gradient": if ref_elevation is None: raise ConfigurationError( "Reference elevation not provided.", item_name="ref_elevation", reason="Missing required parameter", ) if ( isinstance(gradient, float) or isinstance(gradient, list) and len(gradient) == 1 ): unit_values[:, i_unit] = data_raw * ( 1 + gradient * (elevation - ref_elevation) / 100 ) elif isinstance(gradient, list) and len(gradient) == 12: for m in range(12): month = self.data1D.time.dt.month == m + 1 month = month.to_numpy() unit_values[month, i_unit] = data_raw[month] * ( 1 + gradient[m] * (elevation - ref_elevation) / 100 ) else: raise ConfigurationError( f"Wrong gradient format: {gradient}", item_name="gradient", item_value=gradient, reason="Invalid format", ) elif method == "multiplicative_elevation_threshold_gradients": gradient_2 = kwargs.get("gradient_2", None) elevation_threshold = kwargs.get("elevation_threshold", None) if elevation < elevation_threshold: unit_values[:, i_unit] = data_raw * ( 1 + gradient * (elevation - ref_elevation) / 100 ) elif ref_elevation > elevation_threshold: unit_values[:, i_unit] = data_raw * ( 1 + gradient_2 * (elevation - ref_elevation) / 100 ) else: precip_below = data_raw * ( 1 + gradient * (elevation_threshold - ref_elevation) / 100 ) unit_values[:, i_unit] = precip_below * ( 1 + gradient_2 * (elevation - elevation_threshold) / 100 ) else: raise ForcingError( f"Unknown method: {method}", variable=str(variable), method=method ) # Check outputs if not self._can_be_negative(variable): unit_values[unit_values < 0] = 0 # Store outputs if variable in self.data2D.data_name: idx_2d = self.data2D.data_name.index(variable) self.data2D.data[idx_2d] = unit_values else: self.data2D.data.append(unit_values) self.data2D.data_name.append(variable) self.data2D.time = self.data1D.time def _apply_spatialization_from_gridded_data( self, variable: str, method: str = "default", **kwargs: Any ) -> None: """ Spatialize 2D gridded data to hydro units with optional elevation gradients. Regrids spatial meteorological data from netCDF files to hydro units, optionally applying elevation-based corrections. Parameters ---------- variable Name or alias of the variable to spatialize. method Spatialization method. Default is 'regrid_from_netcdf'. **kwargs Parameters for the regridding operation including: - path: Path to netCDF file(s) - file_pattern: Glob pattern for multiple files - data_crs: CRS of the data (as EPSG code) - var_name: Name of variable in the netCDF file - raster_hydro_units: Path to hydro unit IDs raster - apply_data_gradient: Whether to apply elevation gradients - gradient_type: 'additive' or 'multiplicative' Raises ------ ValueError If required parameters missing or invalid method specified. """ variable = self.get_variable_enum(variable) # Specify default methods if method == "default": method = "regrid_from_netcdf" if method == "regrid_from_netcdf": path = kwargs.get("path", "") file_pattern = kwargs.get("file_pattern", None) data_crs = kwargs.get("data_crs", None) var_name = kwargs.get("var_name", "") dim_time = kwargs.get("dim_time", "time") dim_x = kwargs.get("dim_x", "x") dim_y = kwargs.get("dim_y", "y") raster_hydro_units = kwargs.get("raster_hydro_units", "") if variable in {self.Variable.P, self.Variable.T}: apply_data_gradient = kwargs.get("apply_data_gradient", True) if variable == self.Variable.P: gradient_type = kwargs.get("gradient_type", "multiplicative") elif variable == self.Variable.T: gradient_type = kwargs.get("gradient_type", "additive") else: gradient_type = kwargs.get("gradient_type", "additive") else: apply_data_gradient = kwargs.get("apply_data_gradient", False) gradient_type = kwargs.get("gradient_type", "additive") dem_path = None if apply_data_gradient: if self.catchment is None: raise DataError( "apply_data_gradient is True, but no catchment " "is defined. The catchment is required to " "retrieve the elevation-based gradients.", data_type="catchment", reason="Missing catchment", ) if self.catchment.dem is None: raise DataError( "apply_data_gradient is True, but no DEM is " "defined in the catchment. The DEM is required " "to retrieve the elevation-based gradients.", data_type="DEM", reason="Missing DEM", ) dem_path = self.catchment.dem.files # Drop items with .aux.xml extension dem_path = [p for p in dem_path if not p.endswith(".aux.xml")] if len(dem_path) > 1: raise DataError( "apply_data_gradient is True, but the catchment " "contains multiple DEM files. Only one DEM file " "is supported for the elevation-based gradients.", data_type="DEM", reason="Multiple DEM files", ) dem_path = dem_path[0] self.data2D.regrid_from_netcdf( path, file_pattern=file_pattern, data_crs=data_crs, var_name=var_name, dim_time=dim_time, dim_x=dim_x, dim_y=dim_y, hydro_units=self.hydro_units, raster_hydro_units=raster_hydro_units, apply_data_gradient=apply_data_gradient, gradient_type=gradient_type, dem_path=dem_path, ) self.data2D.data_name.append(variable) else: raise ForcingError( f"Unknown method: {method}", variable=str(variable), method=method ) def _apply_pet_computation( self, method: str, use: list[str], **kwargs: Any ) -> None: """ Compute potential evapotranspiration (PET) for all hydro units. Uses the pyet library to compute PET based on available meteorological data and the specified method. Parameters ---------- method Name of the PET computation method (e.g., 'penman', 'pm', 'hargreaves'). use List of variables to use for computation. Can include 'elevation', 'latitude'/'lat'. **kwargs Additional parameters for the PET method and unit-specific data. Raises ------ ImportError If pyet library is not installed. ValueError If required variables are not available. """ if not HAS_PYET: raise ImportError("pyet is required to do this.") pyet_args = {} use_unit_elevation = False use_unit_latitude = False if "elevation" in use: use_unit_elevation = True if "latitude" in use or "lat" in use: # If latitude provided in the arguments if "latitude" in kwargs: pyet_args["lat"] = kwargs["latitude"] * TO_RAD elif "lat" in kwargs: pyet_args["lat"] = kwargs["lat"] * TO_RAD else: use_unit_latitude = True use = self._remove_lat_elevation_options(use) self._check_variables_available(use) # Loop over the hydro units to compute the PET (pyet xarray implementation is # not working as expected in multiplicative operations) pet = np.zeros((len(self.data2D.time), len(self.hydro_units))) for i_unit, (_, unit) in enumerate(self.hydro_units.iterrows()): if use_unit_elevation: pyet_args["elevation"] = unit["elevation"].values if use_unit_latitude: pyet_args["lat"] = unit["latitude"].values * TO_RAD pyet_args = self._set_pyet_variables_data(pyet_args, use, i_unit) pet[:, i_unit] = self._compute_pet(method, pyet_args) # Store outputs if self.Variable.PET not in self.data2D.data_name: self.data2D.data.append(pet) self.data2D.data_name.append(self.Variable.PET) else: idx = self.data2D.data_name.index(self.Variable.PET) self.data2D.data[idx] = pet @staticmethod def _compute_pet(method: str, pyet_args: dict) -> np.ndarray: """ Compute PET using the pyet library. Routes the computation to the appropriate pyet method based on the specified method name. Parameters ---------- method Name of the PET computation method. pyet_args Dictionary of arguments to pass to the pyet method. Returns ------- np.ndarray Array of computed PET values. Raises ------ ValueError If the method name is not recognized. """ pyet_func_name = _PET_METHOD_MAP.get(method) if pyet_func_name is None: raise ForcingError( f"Unknown PET method: {method}", variable="PET", method=method ) return getattr(pyet, pyet_func_name)(**pyet_args) def _set_pyet_variables_data( self, pyet_args: dict, use: list[str], i_unit: int ) -> dict: """ Set meteorological variables for pyet computation. Extracts spatialized meteorological data for a specific hydro unit and adds it to the pyet arguments dictionary using pyet-compatible variable names. Parameters ---------- pyet_args Dictionary of arguments for pyet. Will be updated with variable data. use List of variable names to extract and add. i_unit Index of the hydro unit. Returns ------- dict Updated pyet_args dictionary with meteorological variables added. """ for v in use: v = self.get_variable_enum(v) idx = self.data2D.data_name.index(v) pyet_var_name = { self.Variable.T: "tmean", self.Variable.T_MIN: "tmin", self.Variable.T_MAX: "tmax", self.Variable.T_DEW_POINT: "tdew", self.Variable.RH: "rh", self.Variable.RH_MIN: "rhmin", self.Variable.RH_MAX: "rhmax", self.Variable.R_NET: "rn", self.Variable.R_SOLAR: "rs", self.Variable.WIND: "wind", self.Variable.PRESSURE: "pressure", } pyet_args[pyet_var_name.get(v)] = pd.Series( self.data2D.data[idx][:, i_unit], index=self.data2D.time ) return pyet_args def _check_variables_available(self, use: list[str]) -> None: """ Validate that all required variables are available in data2D. Parameters ---------- use List of variable names to check. Raises ------ ValueError If any variable is not available in data2D. """ # Check if all variables are available use = [self.get_variable_enum(v) for v in use] for v in use: if v not in self.data2D.data_name: raise DataError( f"Variable {v} is not available.", data_type="forcing variable", reason="Variable not in data", ) @staticmethod def _remove_lat_elevation_options(use: list[str]) -> list[str]: """ Remove latitude and elevation options from variable list. These are special options that don't correspond to meteorological variables in the data but rather to spatial parameters. Parameters ---------- use List of variable names. Returns ------- list[str] Filtered list with latitude and elevation removed. """ # Remove latitude and elevation from the list use = [u for u in use if u not in ["latitude", "lat", "elevation"]] return use