from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
from hydrobricks import gpd, rasterio
from hydrobricks._constants import ICE_WE
from hydrobricks._exceptions import ConfigurationError, DataError, DependencyError
from hydrobricks._optional import HAS_PYPROJ, HAS_RASTERIO, HAS_SHAPELY
if TYPE_CHECKING:
from hydrobricks.catchment import Catchment
from hydrobricks.hydro_units import HydroUnits
if HAS_SHAPELY:
from shapely.geometry import MultiPolygon, mapping
if HAS_RASTERIO:
from rasterio.mask import mask
[docs]
class GlacierEvolutionDeltaH:
"""
Class for glacier mass balance. It is based on the following papers:
- Seibert, J., Vis, M. J. P., Kohn, I., Weiler, M., and Stahl, K.: Technical note:
Representing glacier geometry changes in a semi-distributed hydrological model,
Hydrol. Earth Syst. Sci., 22, 2211–2224,
https://doi.org/10.5194/hess-22-2211-2018, 2018.
- Huss, M., Jouvet, G., Farinotti, D., and Bauder, A.: Future high-mountain
hydrology: a new parameterization of glacier retreat, Hydrol. Earth Syst. Sci.,
14, 815–829, https://doi.org/10.5194/hess-14-815-2010, 2010.
"""
def __init__(self, hydro_units: HydroUnits | None = None) -> None:
"""
Initialize the GlacierMassBalance class.
Parameters
----------
hydro_units
The hydro unit object.
"""
self.glacier_df: pd.DataFrame | None = None
self.hydro_units: pd.DataFrame | None = None
self.catchment_area: float | None = None
if hydro_units is not None:
self.hydro_units = hydro_units.hydro_units
self.catchment_area = np.sum(self.hydro_units.area.values)
self.hydro_unit_ids: np.ndarray | None = None
# Pure elevation bands for glacier discretization.
self.elev_bands: np.ndarray | None = None
# Elevation bands subdivided by hydro units.
self.elev_bands_parts: np.ndarray | None = None
# Indices for the elevation bands parts.
self.elev_bands_indices: np.ndarray | None = None
self.pixel_based_approach: bool = False
self.sub_elevation_parts: bool = False
# Tables
self.lookup_table_area: np.ndarray | None = None
self.lookup_table_volume: np.ndarray | None = None
# Glacier water equivalent (we) per part.
self.we_parts: np.ndarray | None = None
# Glacier water equivalent (we) per elevation band.
self.we_bands: np.ndarray | None = None
# Glacier areas as a percentage of the catch. area.
self.areas_pc_parts: np.ndarray | None = None
# Same per elevation band.
self.areas_pc_bands: np.ndarray | None = None
# Delta-h parametrization based on Huss et al. (2010).
self.a_coeff: float = np.nan
self.b_coeff: float = np.nan
self.c_coeff: float = np.nan
self.gamma_coeff: float = np.nan
# Method-related attributes
self.initial_total_glacier_we: float = np.nan
self.norm_elevations_bands: np.ndarray | None = None
self.norm_delta_we_bands: np.ndarray | None = None
self.scaling_factor_mm: float = np.nan
self.excess_melt_we: int | float = 0
# Pixel-wise ice water equivalent (we) for each elevation band
self.px_ice_we: np.ndarray | None = None
[docs]
def compute_ice_thickness_change(
self,
catchment: Catchment,
ice_thickness_old: str,
ice_thickness_new: str,
elevation_bands_distance: int = 10,
smooth_window: int | None = 5,
nodata: float | None = None,
) -> pd.DataFrame:
"""
Compute the normalized ice thickness change between two glacier ice thickness
rasters for the delta-h method.
Computes elevation-dependent ice thickness changes and normalizes them
for use in the delta-h parameterization. Results are stored in
self.norm_elevations_bands and self.norm_delta_we_bands for later use.
Parameters
----------
catchment
The catchment object.
ice_thickness_old
Path to the TIF file containing the old glacier thickness in meters.
ice_thickness_new
Path to the TIF file containing the new glacier thickness in meters.
elevation_bands_distance
Distance between elevation bands in meters. Default is 10 m.
smooth_window
Optional integer window for 1D smoothing (pandas rolling) applied to
the aggregated curve.
nodata
Value to treat as nodata in inputs (replaced with np.nan).
Returns
-------
pd.DataFrame
The glacier_change_df DataFrame containing the normalized
ice thickness change data.
"""
if not HAS_PYPROJ:
raise DependencyError(
"pyproj is required to compute ice thickness change.",
package_name="pyproj",
operation="GlacierEvolutionDeltaH.compute_ice_thickness_change",
install_command="pip install pyproj",
)
# Discretize the DEM into elevation bands at the given distance
elevations, map_bands_ids = self._discretize_elevation_bands(
catchment, elevation_bands_distance, hydro_units_needed=False
)
# Extract the ice thickness and resample it to the DEM resolution
catchment.extract_attribute_raster(
ice_thickness_old,
"ice_thickness_old",
resample_to_dem_resolution=True,
resampling="average",
)
catchment.extract_attribute_raster(
ice_thickness_new,
"ice_thickness_new",
resample_to_dem_resolution=True,
resampling="average",
)
# Get the ice thickness data
ice_thickness_old = catchment.attributes["ice_thickness_old"]["data"]
ice_thickness_old[catchment.dem_data == 0] = 0.0
ice_thickness_old[ice_thickness_old < 0] = 0.0
ice_thickness_new = catchment.attributes["ice_thickness_new"]["data"]
ice_thickness_new[catchment.dem_data == 0] = 0.0
ice_thickness_new[ice_thickness_new < 0] = 0.0
# Handle nodata
if nodata is not None:
ice_thickness_old[ice_thickness_old == nodata] = np.nan
ice_thickness_new[ice_thickness_new == nodata] = np.nan
# Check shape consistency
if ice_thickness_old.shape != ice_thickness_new.shape:
raise DataError(
"The old and new ice thickness rasters must have the same shape.",
data_type="ice thickness",
reason="Shape mismatch between rasters",
)
map_bands_ids[(ice_thickness_old == 0) & (ice_thickness_new == 0)] = 0
band_ids = np.unique(map_bands_ids)
band_ids = band_ids[band_ids != 0]
ice_thickness_diff = ice_thickness_new - ice_thickness_old
# Create the dataframe for the glacier change data
change_df = None
for band_id in band_ids:
new_row = pd.DataFrame(
[[band_id, elevations[band_id - 1], 0.0, 0.0, 0.0]],
columns=[
("band_id", "-"),
("elevation", "m"),
("glacier_thickness_old", "m"),
("glacier_thickness_new", "m"),
("glacier_thickness_diff", "m"),
],
)
if change_df is None:
change_df = new_row
else:
change_df = pd.concat([change_df, new_row], ignore_index=True)
# Update the dataframe with the ice thickness
for idx, row in change_df.iterrows():
band_id = row[("band_id", "-")]
mask_band = map_bands_ids == band_id
mask_nan = np.isnan(ice_thickness_old) | np.isnan(ice_thickness_new)
# Get the ice thickness for the corresponding band and unit
masked_thick_old = ice_thickness_old[mask_band]
masked_thick_old = masked_thick_old[~mask_nan[mask_band]]
masked_thick_new = ice_thickness_new[mask_band]
masked_thick_new = masked_thick_new[~mask_nan[mask_band]]
masked_thick_diff = ice_thickness_diff[mask_band]
masked_thick_diff = masked_thick_diff[~mask_nan[mask_band]]
# Compute the mean thickness
if masked_thick_old.size > 0:
mean_thickness_old = float(np.mean(masked_thick_old))
else:
mean_thickness_old = 0.0
if masked_thick_new.size > 0:
mean_thickness_new = float(np.mean(masked_thick_new))
else:
mean_thickness_new = 0.0
if masked_thick_diff.size > 0:
mean_thickness_diff = float(np.mean(masked_thick_diff))
else:
mean_thickness_diff = 0.0
change_df.at[idx, ("glacier_thickness_old", "m")] = mean_thickness_old
change_df.at[idx, ("glacier_thickness_new", "m")] = mean_thickness_new
change_df.at[idx, ("glacier_thickness_diff", "m")] = mean_thickness_diff
# Remove rows where the new thickness is zero (no glacier)
change_df = change_df[change_df["glacier_thickness_new", "m"] > 0].reset_index(
drop=True
)
# Smooth the thickness change curve
ser_dh = -change_df["glacier_thickness_diff", "m"]
ser_dh[ser_dh < 0] = 0
if smooth_window is not None and smooth_window:
if np.isfinite(ser_dh).any():
ser_dh = (
ser_dh.rolling(window=smooth_window, center=True, min_periods=1)
.mean()
.to_numpy()
)
# Normalize the thickness change
min_dh = np.nanmin(ser_dh)
if min_dh > 0:
ser_dh = ser_dh - min_dh
max_dh = np.nanmax(ser_dh)
if max_dh > 0:
ser_dh = ser_dh / max_dh
change_df["dh", "-"] = ser_dh
# Normalize the elevation
max_elevation_m = np.max(change_df["elevation", "m"].values)
min_elevation_m = np.min(change_df["elevation", "m"].values)
norm_elevations = (max_elevation_m - change_df["elevation", "m"].values) / (
max_elevation_m - min_elevation_m
)
change_df["normalized_elevation", "-"] = norm_elevations
return change_df
[docs]
def compute_initial_ice_thickness(
self,
catchment: Catchment,
glacier_outline: str | None = None,
ice_thickness: str | None = None,
elevation_bands_distance: int = 10,
pixel_based_approach: bool = True,
) -> pd.DataFrame:
"""
Extract the initial ice thickness to be used in compute_lookup_table()
for the glacier mass balance calculation.
Parameters
----------
catchment
The catchment object.
glacier_outline
Path to the SHP file containing the glacier extents.
If None, the glacier extents are extracted from the ice thickness geotiff.
Either this or ice_thickness should be provided.
ice_thickness
Path to the TIF file containing the glacier thickness in meters.
If None, the ice thickness is estimated based on the glacier area
using the Bahr et al. (1997) formula.
Either this or glacier_outline should be provided.
elevation_bands_distance
Distance between elevation bands in meters. Default is 10 m.
pixel_based_approach
Whether to compute the glacier area evolution from the topography.
If True, the glacier area is computed from the topography and the
ice thicknesses are updated accordingly. Otherwise, the glacier area
is updated based on the Bahr et al. (1997) formula.
Default is True.
Returns
-------
The glacier_df DataFrame containing the glacier data.
"""
if glacier_outline is None and ice_thickness is None:
raise DataError(
"Either glacier_outline or ice_thickness should be provided.",
data_type="glacier",
reason="Missing required parameter",
)
if glacier_outline is not None and ice_thickness is not None:
raise DataError(
"Either glacier_outline or ice_thickness should be provided, not both.",
data_type="glacier",
reason="Conflicting parameters",
)
self.pixel_based_approach = pixel_based_approach
# Discretize the DEM into elevation bands at the given distance
elevations, map_bands_ids = self._discretize_elevation_bands(
catchment, elevation_bands_distance
)
# Extract the ice thickness from a TIF file created either from geophysical
# measurements or calculated based on an inversion of surface topography
# (Farinotti et al., 2009a,b; Huss et al., 2010)).
if ice_thickness is not None:
if not HAS_PYPROJ:
raise DependencyError(
"pyproj is required to process glacier ice thickness data.",
package_name="pyproj",
operation="GlacierEvolutionDeltaH.initialize_from_glacier_data",
install_command="pip install pyproj",
)
# Extract the ice thickness and resample it to the DEM resolution
catchment.extract_attribute_raster(
ice_thickness,
"ice_thickness",
resample_to_dem_resolution=True,
resampling="average",
)
ice_thickness = catchment.attributes["ice_thickness"]["data"]
ice_thickness[catchment.dem_data == 0] = 0.0
glaciers_mask = np.zeros(catchment.dem_data.shape)
glaciers_mask[ice_thickness > 0] = 1
else:
# Extract the glacier cover from the shapefile
glaciers_mask = self._extract_glacier_mask_from_shapefile(
catchment, glacier_outline
)
glacier_patches = self._get_glacier_patches(
catchment, map_bands_ids, glaciers_mask
)
# Create the dataframe for the glacier data
glacier_df = None
for band_id, unit_id, area in glacier_patches:
new_row = pd.DataFrame(
[[band_id, elevations[band_id - 1], area, 0.0, unit_id]],
columns=[
("band_id", "-"),
("elevation", "m"),
("glacier_area", "m2"),
("glacier_thickness", "m"),
("hydro_unit_id", "-"),
],
)
if glacier_df is None:
glacier_df = new_row
else:
glacier_df = pd.concat([glacier_df, new_row], ignore_index=True)
# Extract the ice thickness from a TIF file created either from geophysical
# measurements or calculated based on an inversion of surface topography
# (Farinotti et al., 2009a,b; Huss et al., 2010)).
if ice_thickness is not None:
if self.pixel_based_approach:
self.px_ice_we = np.empty((1, len(glacier_df)), dtype=object)
# Update the dataframe with the ice thickness
for i_row, (idx, row) in enumerate(glacier_df.iterrows()):
band_id = row[("band_id", "-")]
unit_id = row[("hydro_unit_id", "-")]
# Get the ice thickness for the corresponding band and unit
mask_band = map_bands_ids == band_id
mask_unit = catchment.map_unit_ids == unit_id
masked_thickness = ice_thickness[mask_band & mask_unit]
masked_thickness = masked_thickness[masked_thickness > 0]
masked_thickness = masked_thickness[~np.isnan(masked_thickness)]
if self.pixel_based_approach:
self.px_ice_we[0, i_row] = masked_thickness * ICE_WE * 1000
# Compute the mean thickness
if masked_thickness.size > 0:
mean_thickness = float(np.mean(masked_thickness))
else:
mean_thickness = 0.0
glacier_df.at[idx, ("glacier_thickness", "m")] = mean_thickness
else:
# Estimation of the overall ice volume using the Bahr et al. (1997)
# formula, to estimate the mean ice thickness.
total_area = np.sum(glacier_df[("glacier_area", "m2")])
total_volume = np.power(total_area, 1.36)
mean_thickness = total_volume / total_area
nonzero_mask = glacier_df[("glacier_area", "m2")] != 0
glacier_df[("glacier_thickness", "m")] = glacier_df[("glacier_area", "m2")]
glacier_df.loc[nonzero_mask, ("glacier_thickness", "m")] = mean_thickness
self.glacier_df = glacier_df
self.hydro_units = catchment.hydro_units.hydro_units
self.catchment_area = np.sum(self.hydro_units.area.values)
return self.glacier_df
[docs]
def compute_lookup_table(
self,
catchment: Catchment | None = None,
glacier_profile_csv: str | None = None,
glacier_df: pd.DataFrame | None = None,
observed_dh: pd.DataFrame | None = None,
nb_increments: int = 200,
update_width: bool = True,
update_width_reference: str = "initial",
):
"""
Prepare the glacier mass balance lookup table. The glacier mass balance is
calculated using the delta-h method (Huss et al., 2010) using the
lookup table method (Seibert et al., 2018). The glacier mass balance is
calculated for each elevation band and melt increment and is synthesized
in a lookup table per hydro unit.
Parameters
----------
catchment
The catchment object. Needed when pixel_based_approach is True.
glacier_profile_csv
Path to the CSV file containing glacier data. An elevation band is smaller
than a hydro unit, usually around 10 m high. It should contain the
following columns:
- elevation: Elevation (m) for each elevation band i (e.g. every 10m).
- glacier_area: Glacier area (m2) for each elevation band i.
- glacier_thickness: Glacier thickness (m) for each elevation band i.
- hydro_unit_id: Hydro unit ID for each elevation band i.
Format example:
elevation glacier_area glacier_thickness hydro_unit_id
m m2 m -
1670 0 0 2
1680 0 0 2
1690 2500 14.7 2
1700 9375 23.2 3
1710 11875 25.2 3
1720 9375 23.2 3
1730 9375 23.2 3
... ... ... ...
If not provided, the glacier data is assumed to be already loaded in the
glacier_df attribute or to be passed as a DataFrame.
glacier_df
DataFrame containing glacier data. Alternative to glacier_profile_csv.
observed_dh
Optional dataframe containing observed delta-h values.
nb_increments
Number of increments for glacier mass balance calculation. Default is 200.
update_width
Whether to update the glacier width at each iteration (Eq. 7 Seibert et al.,
2018). Default is True. Ignored if pixel_based_approach is True.
update_width_reference
Reference for updating the glacier width (Eq. 7 Seibert et al., 2018).
Default is 'initial'. Options are:
- 'initial': Use the initial glacier width.
- 'previous': Use the glacier width from the previous iteration.
Ignored if pixel_based_approach is True.
"""
assert (
self.hydro_units is not None
), "Hydro units are not defined. Please load them first."
assert self.catchment_area is not None, "Catchment area is not defined."
if glacier_profile_csv is not None and glacier_df is not None:
raise DataError(
"Please provide either glacier_profile_csv or glacier_df, not both.",
data_type="glacier",
reason="Conflicting parameters",
)
if glacier_profile_csv is not None:
# Read the glacier data
self.glacier_df = pd.read_csv(glacier_profile_csv, header=[0, 1])
elif glacier_df is not None:
self.glacier_df = glacier_df
assert (
self.glacier_df is not None
), "Glacier data is not defined. Please provide a CSV file or a DataFrame."
if self.pixel_based_approach and catchment is None:
raise ConfigurationError(
"When pixel_based_approach is True, the catchment must be provided.",
item_name="pixel_based_approach",
reason="Missing required catchment parameter",
)
if self.pixel_based_approach:
# Add rows corresponding to the number of increments
cols = self.px_ice_we.shape[1]
new_rows = np.empty((nb_increments, cols), dtype=object)
self.px_ice_we = np.vstack([self.px_ice_we, new_rows])
# We have to remove the bands with no glacier area
# (otherwise the min and max elevations are wrong)
self.glacier_df = self.glacier_df.drop(
self.glacier_df[self.glacier_df[("glacier_area", "m2")] == 0].index
)
# Extract the relevant columns
elev_bands_parts = self.glacier_df[("elevation", "m")].values
initial_areas_m2 = self.glacier_df[("glacier_area", "m2")].values
initial_we_mm = self.glacier_df[("glacier_thickness", "m")].values
initial_we_mm = initial_we_mm * ICE_WE * 1000
hydro_unit_ids = self.glacier_df[("hydro_unit_id", "-")].values
self.excess_melt_we = 0
self.elev_bands_parts = elev_bands_parts
self.hydro_unit_ids = hydro_unit_ids
self.we_parts = np.zeros((nb_increments + 1, len(elev_bands_parts)))
self.we_parts[0] = initial_we_mm
self.areas_pc_parts = np.zeros((nb_increments + 1, len(elev_bands_parts)))
self.areas_pc_parts[0] = initial_areas_m2 / self.catchment_area
self.lookup_table_area = np.zeros(
(nb_increments + 1, len(np.unique(hydro_unit_ids)))
)
self.lookup_table_volume = np.zeros(
(nb_increments + 1, len(np.unique(hydro_unit_ids)))
)
# In case the catchment is discretized by something else than elevation only
self.elev_bands, self.elev_bands_indices = np.unique(
elev_bands_parts, return_inverse=True
)
if len(self.elev_bands) == len(elev_bands_parts):
self.sub_elevation_parts = False
else:
self.sub_elevation_parts = True
self._init_parts_containers(nb_increments)
self._initialization()
for increment in range(1, nb_increments): # last row kept with zeros
self._compute_delta_h(increment, nb_increments, observed_dh)
self._update_glacier_thickness(increment)
self._width_scaling(
increment, update_width, update_width_reference, catchment=catchment
)
if not update_width and not self.pixel_based_approach:
self._final_width_scaling()
self._compute_sub_band_parts()
self._update_lookup_tables()
[docs]
def get_lookup_table_area(self) -> pd.DataFrame:
"""
Get the glacier area evolution lookup table.
Returns
-------
The glacier area evolution lookup table.
"""
return pd.DataFrame(
self.lookup_table_area,
index=range(self.lookup_table_area.shape[0]),
columns=np.unique(self.hydro_unit_ids),
)
[docs]
def get_lookup_table_volume(self) -> pd.DataFrame:
"""
Get the glacier volume evolution lookup table.
Returns
-------
The glacier volume evolution lookup table.
"""
return pd.DataFrame(
self.lookup_table_volume,
index=range(self.lookup_table_volume.shape[0]),
columns=np.unique(self.hydro_unit_ids),
)
[docs]
def save_as_csv(self, output_dir: str | Path):
"""
Save the glacier evolution lookup table as a CSV file.
Parameters
----------
output_dir
Path to the directory where the CSV file will be saved.
"""
output_dir = Path(output_dir)
# Write record to the lookup table
lookup_table_area = pd.DataFrame(
self.lookup_table_area,
index=range(self.lookup_table_area.shape[0]),
columns=np.unique(self.hydro_unit_ids),
)
lookup_table_area.to_csv(
output_dir / "glacier_evolution_lookup_table_area.csv", index=False
)
lookup_table_volume = pd.DataFrame(
self.lookup_table_volume,
index=range(self.lookup_table_volume.shape[0]),
columns=np.unique(self.hydro_unit_ids),
)
lookup_table_volume.to_csv(
output_dir / "glacier_evolution_lookup_table_volume.csv", index=False
)
if self.areas_pc_parts is not None:
columns = pd.MultiIndex.from_arrays(
[
range(len(self.areas_pc_parts[0])),
self.hydro_unit_ids,
self.elev_bands_parts,
],
names=["id", "hydro_unit_id", "elevation_band"],
)
details_glacier_areas = pd.DataFrame(
self.areas_pc_parts * self.catchment_area,
index=range(self.areas_pc_parts.shape[0]),
columns=columns,
)
details_glacier_areas.to_csv(
output_dir / "details_glacier_areas_evolution.csv", index=False
)
if self.we_parts is not None:
columns = pd.MultiIndex.from_arrays(
[
range(len(self.we_parts[0])),
self.hydro_unit_ids,
self.elev_bands_parts,
],
names=["id", "hydro_unit_id", "elevation_band"],
)
details_glacier_we = pd.DataFrame(
self.we_parts, index=range(self.we_parts.shape[0]), columns=columns
)
details_glacier_we.to_csv(
output_dir / "details_glacier_we_evolution.csv", index=False
)
def _init_parts_containers(self, nb_increments: int):
"""
Initialize the containers for sub-elevation parts.
"""
if not self.sub_elevation_parts:
self.areas_pc_bands = self.areas_pc_parts
self.we_bands = self.we_parts
self.elev_bands_indices = np.arange(len(self.elev_bands))
else:
# Loop over elevation groups and sum relevant columns
self.areas_pc_bands = np.zeros((nb_increments + 1, len(self.elev_bands)))
self.we_bands = np.zeros((nb_increments + 1, len(self.elev_bands)))
for _, elev_idx in enumerate(np.arange(len(self.elev_bands))):
mask = self.elev_bands_indices == elev_idx
self.areas_pc_bands[0, elev_idx] = self.areas_pc_parts[0, mask].sum()
pc = self.areas_pc_parts[0, mask] / self.areas_pc_bands[0, elev_idx]
self.we_bands[0, elev_idx] = np.sum(self.we_parts[0, mask] * pc)
def _initialization(self):
"""
Step 1 of Seibert et al. (2018): Calculation of the initial total glacier mass
and normalization of glacier elevations. Selection of the right glacial
parametrization (delta-h method).
"""
# Calculate total glacier mass (Eq. 2)
# areas: area (expressed as a proportion of the catchment area) for each
# elevation band i
self.initial_total_glacier_we = np.sum(
self.areas_pc_parts[0] * self.we_parts[0, :]
)
# Normalize glacier elevations (Eq. 3)
# elevations: absolute elevation for each elevation band i
max_elevation_m = np.max(self.elev_bands_parts)
min_elevation_m = np.min(self.elev_bands_parts)
self.norm_elevations_bands = (max_elevation_m - self.elev_bands) / (
max_elevation_m - min_elevation_m
)
# Choose the glacial parametrization (Huss et al., 2010, Fig. 2a)
glacier_area_km2 = (
np.sum(self.areas_pc_parts[0]) * self.catchment_area / (1000 * 1000)
)
self._set_delta_h_parametrization(glacier_area_km2)
def _set_delta_h_parametrization(self, glacier_area_km2: float):
"""
Delta-h parametrization of the glacier. See Huss et al. (2010).
"""
if glacier_area_km2 > 20: # Large valley glacier
self.a_coeff = -0.02
self.b_coeff = 0.12
self.c_coeff = 0.00
self.gamma_coeff = 6
elif 5 <= glacier_area_km2 <= 20: # Medium valley glacier
self.a_coeff = -0.05
self.b_coeff = 0.19
self.c_coeff = 0.01
self.gamma_coeff = 4
elif 0 < glacier_area_km2 < 5: # Small glacier
self.a_coeff = -0.30
self.b_coeff = 0.60
self.c_coeff = 0.09
self.gamma_coeff = 2
def _compute_delta_h(self, increment: int, nb_increments: int, observed_dh: None):
"""
Step 2 of Seibert et al. (2018): Apply the delta-h method to calculate the
change in glacier mass balance.
"""
# Apply ∆h-parameterization (Eq. 4)
if observed_dh is not None:
# Use observed delta-h values
obs_norm_dh = observed_dh["dh", "-"].values
obs_norm_elev = observed_dh["normalized_elevation", "-"].values
# Interpolate to the model elevation bands
interp_norm_dh = np.interp(
self.norm_elevations_bands, obs_norm_elev[::-1], obs_norm_dh[::-1]
)
self.norm_delta_we_bands = interp_norm_dh
else:
# Gives the normalized (dimensionless) ice thickness change for each
# elevation band i
self.norm_delta_we_bands = (
np.power(self.norm_elevations_bands + self.a_coeff, self.gamma_coeff)
+ self.b_coeff * (self.norm_elevations_bands + self.a_coeff)
+ self.c_coeff
)
# Calculate the scaling factor fs (Eq. 5)
# The glacier volume change increment, added with the excess melt that could
# not be melted in the previous step
glacier_we_change_mm = (
self.initial_total_glacier_we / nb_increments + self.excess_melt_we
)
# "A scaling factor fS (mm), which scales the dimensionless ∆h, is computed
# based on the glacier volume change M and on the area and normalized water
# equivalent change for each of the elevation bands."
self.scaling_factor_mm = glacier_we_change_mm / (
np.sum(self.areas_pc_bands[increment - 1] * self.norm_delta_we_bands)
)
def _update_glacier_thickness(self, increment: int):
"""
Step 3 of Seibert et al. (2018): "For each elevation band reduce the glacier
water equivalent according to the empirical functions from Huss et al. (2010)
(Eq. 4) to compute the glacier geometry for the reduced mass (see Eq. 6).
If the computed thickness change is larger than the remaining glacier thickness
(most likely to occur at the glacier tongue [...]), the glacier thickness is
reduced to zero, resulting in a glacier-free elevation band, and the portion of
the glacier thickness change that would have resulted in a negative glacier
thickness is included in the next iteration step (i.e. the next 1% melt)."
"""
# Update glacier thicknesses (Eq. 6)
if not self.pixel_based_approach:
# Glacier water equivalent reduction computed per elevation band
we_reduction_bands = self.scaling_factor_mm * self.norm_delta_we_bands
# Update glacier thicknesses per elevation band
new_we_bands = self.we_bands[increment - 1] - we_reduction_bands
self.we_bands[increment] = np.where(new_we_bands < 0, 0, new_we_bands)
# This excess melt is taken into account in Step 2.
self.excess_melt_we = -np.sum(
np.where(new_we_bands < 0, new_we_bands, 0)
* self.areas_pc_bands[increment - 1]
)
return
# If the glacier area evolution is based on the topography
# Glacier water equivalent reduction
we_reduction_mm = self.scaling_factor_mm * self.norm_delta_we_bands
# Copy the previous increment values as starting point
self.we_bands[increment, :] = self.we_bands[increment - 1, :]
self.we_parts[increment, :] = self.we_parts[increment - 1, :]
self.px_ice_we[increment, :] = self.px_ice_we[increment - 1, :].copy()
# Compute the melt
excess_melt = np.zeros(len(self.elev_bands))
for i_band in np.arange(len(self.elev_bands)):
band_mask = self.elev_bands_indices == i_band
melt = we_reduction_mm[i_band]
# If the whole band melts, store the excess melt
if self.we_bands[increment, i_band] < melt:
excess_melt[i_band] = self.we_bands[increment, i_band] - melt
self.we_bands[increment, i_band] = 0
self.we_parts[increment, band_mask] = 0
for idx in np.where(band_mask)[0]:
self.px_ice_we[increment, idx] = np.array([])
continue
# Otherwise, update the glacier water equivalent of the parts
band_mask_idx = np.where(band_mask)[0]
while melt > 0:
excess_melt_parts = np.zeros(len(band_mask_idx))
for i_part, idx in enumerate(band_mask_idx):
# Skip parts with no glacier w.e. (happens during iterations)
if self.we_parts[increment, idx] <= 0:
continue
# If the whole part melts, store the excess melt locally
if self.we_parts[increment, idx] < melt:
excess_melt_parts[i_part] = self.we_parts[increment, idx] - melt
self.we_parts[increment, idx] = 0
self.px_ice_we[increment, idx] = np.array([])
continue
# Otherwise, update the glacier water equivalent of the pixels
px_values = self.px_ice_we[increment, idx]
px_values -= melt
# Ensure that the pixel-wise ice water equivalent is not negative
while np.any(px_values < 0):
px_melt_deficit = -np.sum(px_values[px_values < 0])
px_melt = px_melt_deficit / len(px_values[px_values > 0])
px_values = np.where(px_values <= 0, 0, px_values - px_melt)
self.px_ice_we[increment, idx] = px_values
# Update the glacier water equivalent for the part
new_we = np.mean(px_values)
ref_we = self.we_parts[increment, idx]
assert np.isclose(new_we, ref_we - melt, atol=1e-02), (
f"Mismatch in glacier w.e. for part {idx} "
f"({ref_we - melt} != {new_we})"
)
assert new_we >= 0, f"Negative glacier w.e. for part {idx}"
self.we_parts[increment, idx] = new_we
# Compute the excess melt for the band
excess_melt_parts_tot = -np.sum(
excess_melt_parts * self.areas_pc_parts[increment - 1, band_mask]
)
# Scale the excess melt to area of the positive parts
pos_areas = self.areas_pc_parts[increment - 1, band_mask]
pos_areas = pos_areas[self.we_parts[increment, band_mask] > 0]
melt = excess_melt_parts_tot / np.sum(pos_areas)
# This excess melt is taken into account in Step 2.
self.excess_melt_we = -np.sum(excess_melt * self.areas_pc_bands[increment - 1])
def _width_scaling(
self,
increment: int,
update_width: bool,
update_width_reference: str = "initial",
catchment: Catchment = None,
):
"""
Step 4 of Seibert et al. (2018): "The width scaling within each elevation band
relates a decrease in glacier thickness to a reduction of the glacier area
within the respective elevation band. In other words, this approach also allows
for glacier area shrinkage at higher elevations, which mimics the typical
spatial effect of the downwasting of glaciers. Relation from Bahr et al. (1997)"
"""
if not self.pixel_based_approach:
if update_width:
if update_width_reference == "previous":
pos_we = self.we_bands[increment - 1] > 0
self.areas_pc_bands[increment, pos_we] = self.areas_pc_bands[
increment - 1, pos_we
] * np.power(
self.we_bands[increment, pos_we]
/ self.we_bands[increment - 1, pos_we],
0.5,
)
elif update_width_reference == "initial":
pos_we = self.we_bands[0] > 0
self.areas_pc_bands[increment, pos_we] = self.areas_pc_bands[
0, pos_we
] * np.power(
self.we_bands[increment, pos_we] / self.we_bands[0, pos_we], 0.5
)
else:
raise ConfigurationError(
"update_width_reference should be either "
"'previous' or 'initial'.",
item_name="update_width_reference",
item_value=update_width_reference,
reason="Invalid parameter value",
)
# Conservation of the water equivalent
pos_we = self.we_bands[increment] > 0
self.we_bands[increment, pos_we] *= (
self.areas_pc_bands[increment - 1, pos_we]
/ self.areas_pc_bands[increment, pos_we]
)
# Nullify the areas of the elevation bands with no glacier w.e.
self.areas_pc_bands[increment, self.we_bands[increment] == 0] = 0
else:
# If the glacier width is not updated, keep the previous glacier area.
self.areas_pc_bands[increment] = self.areas_pc_bands[increment - 1]
# Nullify the areas of the elevation bands with no glacier water equiv.
self.areas_pc_bands[increment, self.we_bands[increment] == 0] = 0
else:
self.areas_pc_bands[increment] = self.areas_pc_bands[increment - 1]
px_area = catchment.get_dem_pixel_area()
# Update the glacier area based on the pixel-wise ice water equivalent
areas = np.zeros(len(self.elev_bands_parts))
for i in range(len(self.elev_bands_parts)):
ice_we = self.px_ice_we[increment, i]
areas[i] = np.count_nonzero(ice_we) * px_area
# Drop pixels with no ice water equivalent
self.px_ice_we[increment, i] = ice_we[ice_we > 0]
self.areas_pc_parts[increment, :] = areas / self.catchment_area
# Adjust the glacier ice w.e. per part (px with 0 ice w.e. were dropped)
self.we_parts[increment] = np.array(
[
np.mean(arr) if arr.size > 0 else 0
for arr in self.px_ice_we[increment]
]
)
# Updating the band areas and band thicknesses
for elev_idx in np.arange(len(self.elev_bands)):
band_mask = self.elev_bands_indices == elev_idx
self.areas_pc_bands[increment, elev_idx] = self.areas_pc_parts[
increment, band_mask
].sum()
band_we = np.concatenate(self.px_ice_we[increment, band_mask])
if band_we.size == 0:
self.we_bands[increment, elev_idx] = 0
else:
self.we_bands[increment, elev_idx] = np.mean(band_we)
def _final_width_scaling(self):
"""
Similar to _width_scaling, but for the case when the glacier area is not
updated during the loop.
"""
# Width scaling (Eq. 7)
for i in range(1, len(self.we_bands)):
pos_we = self.we_bands[0] > 0
self.areas_pc_bands[i, pos_we] = self.areas_pc_bands[0, pos_we] * np.power(
self.we_bands[i, pos_we] / self.we_bands[0, pos_we], 0.5
)
# Conservation of the water equivalent
pos_we = self.we_bands[i] > 0
self.we_bands[i, pos_we] *= (
self.areas_pc_bands[0, pos_we] / self.areas_pc_bands[i, pos_we]
)
def _adjust_areas_parts(self, increment):
"""
Adjust the areas of the sub-band part.
"""
self.areas_pc_parts[increment] = self.areas_pc_parts[0]
# Nullify the areas of the elevation bands with no glacier water equivalent
self.areas_pc_parts[increment, self.we_parts[increment] == 0] = 0
for _, elev_idx in enumerate(np.arange(len(self.elev_bands))):
band_mask = (self.elev_bands_indices == elev_idx) & (
self.we_parts[increment] > 0
)
if not np.any(band_mask):
continue
# Scaling each part of the total band proportionally
self.areas_pc_parts[increment, band_mask] *= (
self.areas_pc_bands[increment, elev_idx]
/ self.areas_pc_bands[0, elev_idx]
)
# No need to adjust the glacier thicknesses here, as they are already
# accounted for by the scaled band w.e.
def _adjust_we_parts(self, increment):
"""
Adjust the volume of the sub-band part.
"""
# Decreasing glacier w.e. per elevation band parts proportionally
# to the glacier area per elevation band parts
self.we_parts[increment] = self.we_parts[0]
for elev_idx in range(len(self.elev_bands)):
band_mask = self.elev_bands_indices == elev_idx
if self.we_bands[increment, elev_idx] == 0:
self.we_parts[increment, band_mask] = 0
continue
self.we_parts[increment, band_mask] *= (
self.we_bands[increment, elev_idx] / self.we_bands[0, elev_idx]
)
def _compute_sub_band_parts(self):
"""
For discretization of hydro units into smaller units than elevation bands,
we need to compute the glacier areas and water equivalent for each part.
"""
if self.pixel_based_approach:
return
if not self.sub_elevation_parts:
self.areas_pc_parts = self.areas_pc_bands
self.we_parts = self.we_bands
return
for i in range(1, len(self.we_bands)):
# Update the w.e. per elevation band parts
self._adjust_we_parts(i)
# Redistribution following the sub-elevation discretization.
self._adjust_areas_parts(i)
def _update_lookup_tables(self):
"""
Step 5 (6) of Seibert et al. (2018): "Sum the total (width-scaled) areas for all
respective elevation bands which are covered by glaciers (i.e. glacier water
equivalent ≥ 0) for each elevation zone."
"""
# Update elevation zone areas
for i, hydro_unit_id in enumerate(np.unique(self.hydro_unit_ids)):
indices = np.where(self.hydro_unit_ids == hydro_unit_id)[0]
if len(indices) != 0:
areas = self.areas_pc_parts[:, indices] * self.catchment_area
self.lookup_table_area[:, i] = np.sum(areas, axis=1)
volumes = self.we_parts[:, indices] * areas / (ICE_WE * 1000)
self.lookup_table_volume[:, i] = np.sum(volumes, axis=1)
else:
self.lookup_table_area[:, i] = 0
self.lookup_table_volume[:, i] = 0
@staticmethod
def _discretize_elevation_bands(
catchment: Catchment,
elevation_bands_distance: int = 10,
hydro_units_needed: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
"""Discretize the DEM into elevation bands at the given distance."""
# Check that the catchment has been discretized
if hydro_units_needed and catchment.map_unit_ids is None:
raise ConfigurationError(
"Catchment has not been discretized. "
"Please run create_elevation_bands() first.",
reason="Catchment not initialized",
)
if (
catchment.hydro_units is not None
and "elevation" in catchment.hydro_units.hydro_units.columns
):
hydro_units = catchment.hydro_units.hydro_units
# Check that the catchment hydro units are consistent with the desired
# elevation_bands_distance parameter
elevations = np.unique(np.sort(hydro_units["elevation"].values.ravel()))
steps = np.diff(elevations)
hu_steps = np.min(steps)
if hu_steps % elevation_bands_distance != 0:
raise ConfigurationError(
f"Hydro unit elevation range ({hu_steps}) must be a "
f"multiple of the elevation bands distance "
f"({elevation_bands_distance}). Please adjust the "
f"elevation_bands_distance parameter.",
item_name="elevation_bands_distance",
item_value=elevation_bands_distance,
reason="Value not compatible with hydro units",
)
# Discretize the DEM into elevation bands at the given distance
min_elevation = hydro_units["elevation"].min().values[0] - hu_steps / 2
max_elevation = hydro_units["elevation"].max().values[0] + hu_steps / 2
else:
dist = elevation_bands_distance
min_elevation = np.nanmin(catchment.dem_data)
min_elevation = min_elevation - (min_elevation % dist)
max_elevation = np.nanmax(catchment.dem_data)
max_elevation = max_elevation + (dist - max_elevation % dist)
elevations = np.arange(
min_elevation,
max_elevation + elevation_bands_distance,
elevation_bands_distance,
)
map_bands_ids = np.zeros(catchment.dem_data.shape)
for i in range(len(elevations) - 1):
val_min = elevations[i]
val_max = elevations[i + 1]
mask_band = np.logical_and(
catchment.dem_data >= val_min, catchment.dem_data < val_max
)
map_bands_ids[mask_band] = i + 1
map_bands_ids = map_bands_ids.astype(rasterio.uint16)
# Set the elevation band values to the middle of the band
elevations = elevations + elevation_bands_distance / 2
return elevations, map_bands_ids
def _extract_glacier_mask_from_shapefile(
self, catchment: Catchment, glacier_outline: str | Path
) -> np.ndarray:
"""Extract the glacier cover from shapefiles."""
# Clip the glaciers to the catchment extent
all_glaciers = gpd.read_file(glacier_outline)
all_glaciers.to_crs(catchment.crs, inplace=True)
if catchment.outline[0].geom_type == "MultiPolygon":
glaciers = gpd.clip(all_glaciers, catchment.outline[0])
elif catchment.outline[0].geom_type == "Polygon":
glaciers = gpd.clip(all_glaciers, MultiPolygon(catchment.outline))
else:
raise DataError(
"The catchment outline must be a (multi)polygon.",
data_type="catchment outline",
reason="Invalid geometry type",
)
glaciers = self._simplify_df_geometries(glaciers)
# Get the glacier mask
glaciers_mask = self._mask_dem(catchment, glaciers, -9999)
return glaciers_mask
@staticmethod
def _get_glacier_patches(
catchment: Catchment, map_bands_ids: np.ndarray, glaciers_mask: np.ndarray
) -> list[tuple[int, int, float]]:
# Extract the pixel size
px_area = catchment.get_dem_pixel_area()
map_bands_ids = np.where(glaciers_mask > 0, map_bands_ids, 0)
band_ids = np.unique(map_bands_ids)
band_ids = band_ids[band_ids != 0]
glacier_patches = []
for band_id in band_ids:
mask_band = map_bands_ids == band_id
# Get the hydro unit ids for the corresponding band
unit_ids = np.unique(catchment.map_unit_ids[mask_band])
unit_ids = unit_ids[unit_ids != 0]
for unit_id in unit_ids:
mask_unit_id = catchment.map_unit_ids[mask_band] == unit_id
area = np.count_nonzero(mask_unit_id) * px_area
glacier_patches.append((band_id, unit_id, area))
return glacier_patches
@staticmethod
def _mask_dem(
catchment: Catchment, shapefile: gpd.GeoDataFrame, nodata: int = -9999
) -> np.ndarray:
geoms = []
for geo in shapefile.geometry.values:
geoms.append(mapping(geo))
dem_masked, _ = mask(catchment.dem, geoms, crop=False, all_touched=True)
dem_masked[dem_masked == catchment.dem.nodata] = nodata
if len(dem_masked.shape) == 3:
dem_masked = dem_masked[0]
return dem_masked
@staticmethod
def _simplify_df_geometries(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
# Merge the polygons
df["new_col"] = 0
df = df.dissolve(by="new_col", as_index=False)
# Drop all columns except the geometry
df = df[["geometry"]]
return df