from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
from hydrobricks._constants import ICE_WE
from hydrobricks._exceptions import ConfigurationError, DependencyError
from hydrobricks._optional import HAS_PYPROJ
if TYPE_CHECKING:
from hydrobricks.catchment import Catchment
[docs]
class GlacierEvolutionAreaScaling:
"""
Class for a simple volume-area scaling approach for glacier evolution. It computes
the area per hydro unit (e.g. elevation band) for an incremental melt per band.
It does not account for the glacier dynamic and should not be used for medium
to large glaciers. The delta-h method is preferred for such cases.
"""
def __init__(self) -> None:
"""
Initialize the GlacierMassBalance class.
"""
self.glacier_df: pd.DataFrame | None = None
self.hydro_units: pd.DataFrame | None = None
self.hydro_unit_ids: np.ndarray | None = None
self.catchment_area: float | None = None
# Tables
self.lookup_table_area: np.ndarray | None = None
self.lookup_table_volume: np.ndarray | None = None
self.we: np.ndarray | None = None
self.areas_pc: np.ndarray | None = None
# Pixel-wise ice water equivalent (we) for each hydro unit
self.px_ice_we: np.ndarray | None = None
[docs]
def compute_lookup_table(
self, catchment: Catchment, ice_thickness: str | Path, nb_increments: int = 200
) -> None:
"""
Extract the initial ice thickness to be used in compute_lookup_table()
for the glacier mass balance calculation.
Computes glacier area and volume evolution lookup tables based on incremental
melting using a simple volume-area scaling approach. Results stored in
lookup_table_area and lookup_table_volume attributes.
Parameters
----------
catchment
The catchment object.
ice_thickness
Path to the TIF file containing the glacier thickness in meters.
nb_increments
Number of increments for glacier mass balance calculation. Default is 200.
"""
# Check that the catchment has been discretized
if catchment.map_unit_ids is None:
raise ConfigurationError(
"Catchment has not been discretized. "
"Please run create_elevation_bands() first.",
reason="Catchment not initialized",
)
# Extract the ice thickness from a TIF file.
if not HAS_PYPROJ:
raise DependencyError(
"pyproj is required to extract glacier thickness.",
package_name="pyproj",
operation="GlacierEvolutionAreaScaling.compute_lookup_table",
install_command="pip install pyproj",
)
self.hydro_units = catchment.hydro_units.hydro_units
self.catchment_area = np.sum(self.hydro_units.area.values)
self._extract_ice_thickness(catchment, ice_thickness)
self._initialization(nb_increments)
# Compute the ice w.e. mass to remove per increment
mass_change = self.we[0, :] * self.areas_pc[0, :] / nb_increments
for increment in range(1, nb_increments): # last row kept with zeros
self._update_glacier_thickness(increment, mass_change)
self._width_scaling(increment, catchment=catchment)
self._update_lookup_tables()
[docs]
def get_lookup_table_area(self) -> pd.DataFrame:
"""
Get the glacier area evolution lookup table.
Returns
-------
The glacier area evolution lookup table.
"""
return pd.DataFrame(
self.lookup_table_area,
index=range(self.lookup_table_area.shape[0]),
columns=self.hydro_unit_ids,
)
[docs]
def get_lookup_table_volume(self) -> pd.DataFrame:
"""
Get the glacier volume evolution lookup table.
Returns
-------
The glacier volume evolution lookup table.
"""
return pd.DataFrame(
self.lookup_table_volume,
index=range(self.lookup_table_volume.shape[0]),
columns=self.hydro_unit_ids,
)
[docs]
def save_as_csv(self, output_dir: str | Path) -> None:
"""
Save the glacier evolution lookup table as a CSV file.
Writes three CSV files: glacier area evolution, glacier volume evolution,
and optional detailed glacier area and water equivalent evolution per pixel.
Parameters
----------
output_dir
Path to the directory where the CSV file will be saved.
"""
output_dir = Path(output_dir)
# Write record to the lookup table
lookup_table_area = pd.DataFrame(
self.lookup_table_area,
index=range(self.lookup_table_area.shape[0]),
columns=self.hydro_unit_ids,
)
lookup_table_area.to_csv(
output_dir / "glacier_evolution_lookup_table_area.csv", index=False
)
lookup_table_volume = pd.DataFrame(
self.lookup_table_volume,
index=range(self.lookup_table_volume.shape[0]),
columns=self.hydro_unit_ids,
)
lookup_table_volume.to_csv(
output_dir / "glacier_evolution_lookup_table_volume.csv", index=False
)
if self.areas_pc is not None:
columns = pd.MultiIndex.from_arrays(
[range(len(self.areas_pc[0])), self.hydro_unit_ids],
names=["id", "hydro_unit_id"],
)
details_glacier_areas = pd.DataFrame(
self.areas_pc * self.catchment_area,
index=range(self.areas_pc.shape[0]),
columns=columns,
)
details_glacier_areas.to_csv(
output_dir / "details_glacier_areas_evolution.csv", index=False
)
if self.we is not None:
columns = pd.MultiIndex.from_arrays(
[range(len(self.we[0])), self.hydro_unit_ids],
names=["id", "hydro_unit_id"],
)
details_glacier_we = pd.DataFrame(
self.we, index=range(self.we.shape[0]), columns=columns
)
details_glacier_we.to_csv(
output_dir / "details_glacier_we_evolution.csv", index=False
)
def _extract_ice_thickness(
self, catchment: Catchment, ice_thickness: str | Path
) -> None:
"""
Extract ice thickness from raster and create glacier patches.
Loads ice thickness data from a raster file, resamples it to DEM resolution,
identifies glacier areas, and creates glacier patches for each hydro unit.
Stores result in self.glacier_df and self.px_ice_we.
Parameters
----------
catchment
The catchment object with map_unit_ids defined.
ice_thickness
Path to ice thickness raster file.
"""
catchment.extract_attribute_raster(
ice_thickness,
"ice_thickness",
resample_to_dem_resolution=True,
resampling="average",
)
ice_thickness = catchment.attributes["ice_thickness"]["data"]
ice_thickness[catchment.dem_data == 0] = 0.0
glaciers_mask = np.zeros(catchment.dem_data.shape)
glaciers_mask[ice_thickness > 0] = 1
glacier_patches = self._get_glacier_patches(catchment, glaciers_mask)
# Create the dataframe for the glacier data
glacier_df = None
for unit_id, area in glacier_patches:
new_row = pd.DataFrame(
[[unit_id, area, 0.0]],
columns=[
("hydro_unit_id", "-"),
("glacier_area", "m2"),
("glacier_thickness", "m"),
],
)
if glacier_df is None:
glacier_df = new_row
else:
glacier_df = pd.concat([glacier_df, new_row], ignore_index=True)
self.px_ice_we = np.empty((1, len(glacier_df)), dtype=object)
# Update the dataframe with the ice thickness
for i_row, (idx, row) in enumerate(glacier_df.iterrows()):
unit_id = row[("hydro_unit_id", "-")]
# Get the ice thickness for the corresponding band and unit
mask_unit = catchment.map_unit_ids == unit_id
masked_thickness = ice_thickness[mask_unit]
masked_thickness = masked_thickness[masked_thickness > 0]
masked_thickness = masked_thickness[~np.isnan(masked_thickness)]
self.px_ice_we[0, i_row] = masked_thickness * ICE_WE * 1000
# Compute the mean thickness
if masked_thickness.size > 0:
mean_thickness = float(np.mean(masked_thickness))
else:
mean_thickness = 0.0
glacier_df.at[idx, ("glacier_thickness", "m")] = mean_thickness
self.glacier_df = glacier_df
def _initialization(self, nb_increments: int) -> None:
"""
Initialize lookup tables and arrays for incremental glacier evolution.
Sets up data structures to track glacier water equivalent, area percentages,
and lookup tables for glacier area and volume evolution over increments.
Parameters
----------
nb_increments
Number of increments for evolution computation.
"""
# Add rows corresponding to the number of increments
cols = self.px_ice_we.shape[1]
new_rows = np.empty((nb_increments, cols), dtype=object)
self.px_ice_we = np.vstack([self.px_ice_we, new_rows])
# Extract the relevant columns
initial_areas_m2 = self.glacier_df[("glacier_area", "m2")].values
initial_we_mm = self.glacier_df[("glacier_thickness", "m")].values
initial_we_mm = initial_we_mm * ICE_WE * 1000
hydro_unit_ids = self.glacier_df[("hydro_unit_id", "-")].values
self.hydro_unit_ids = hydro_unit_ids
self.we = np.zeros((nb_increments + 1, len(hydro_unit_ids)))
self.we[0] = initial_we_mm
self.areas_pc = np.zeros((nb_increments + 1, len(hydro_unit_ids)))
self.areas_pc[0] = initial_areas_m2 / self.catchment_area
self.lookup_table_area = np.zeros((nb_increments + 1, len(hydro_unit_ids)))
self.lookup_table_volume = np.zeros((nb_increments + 1, len(hydro_unit_ids)))
def _update_glacier_thickness(
self, increment: int, mass_change: np.ndarray
) -> None:
"""
Update glacier water equivalent for an increment of mass loss.
Distributes mass change across glacier pixels in each hydro unit,
ensuring that ice water equivalent never becomes negative by redistributing
excess melt to remaining pixels.
Parameters
----------
increment
Current increment number.
mass_change
Array of mass change per hydro unit for this increment.
"""
# Copy the previous increment values as starting point
self.we[increment, :] = self.we[increment - 1, :]
self.px_ice_we[increment, :] = self.px_ice_we[increment - 1, :].copy()
# Compute the melt
for i_unit in np.arange(len(self.hydro_unit_ids)):
change = mass_change[i_unit]
melt = change / self.areas_pc[increment - 1, i_unit]
# Update the glacier water equivalent of the pixels
px_values = self.px_ice_we[increment, i_unit]
px_values -= melt
# Ensure that the pixel-wise ice water equivalent is not negative
while np.any(px_values < 0):
px_melt_deficit = -np.sum(px_values[px_values < 0])
px_melt = px_melt_deficit / len(px_values[px_values > 0])
px_values = np.where(px_values <= 0, 0, px_values - px_melt)
self.px_ice_we[increment, i_unit] = px_values
# Update the glacier water equivalent for the hydro unit
new_we = np.mean(px_values)
ref_we = self.we[increment, i_unit]
assert np.isclose(new_we, ref_we - melt, atol=1e-02), (
f"Mismatch in glacier w.e. for unit {i_unit} "
f"({ref_we - melt} != {new_we})"
)
self.we[increment, i_unit] = new_we
def _width_scaling(self, increment: int, catchment: Catchment) -> None:
"""
Update glacier area using width-scaling approach.
Recalculates glacier areas for each hydro unit based on remaining pixels with
non-zero ice water equivalent. Removes pixels with zero ice and updates the
glacier water equivalent values.
Parameters
----------
increment
Current increment number.
catchment
The catchment object with DEM pixel area information.
"""
self.areas_pc[increment] = self.areas_pc[increment - 1]
px_area = catchment.get_dem_pixel_area()
# Update the glacier area based on the pixel-wise ice water equivalent
areas = np.zeros(len(self.hydro_unit_ids))
for i in range(len(self.hydro_unit_ids)):
ice_we = self.px_ice_we[increment, i]
areas[i] = np.count_nonzero(ice_we) * px_area
# Drop pixels with no ice water equivalent
self.px_ice_we[increment, i] = ice_we[ice_we > 0]
self.areas_pc[increment, :] = areas / self.catchment_area
# Adjust the glacier ice w.e. per part (px with 0 ice w.e. were dropped)
self.we[increment] = np.array(
[np.mean(arr) if arr.size > 0 else 0 for arr in self.px_ice_we[increment]]
)
def _update_lookup_tables(self) -> None:
"""
Update lookup tables with glacier area and volume for each increment.
Aggregates glacier areas and volumes across hydro units based on elevation
bands. Step 5 (6) of Seibert et al. (2018): "Sum the total (width-scaled)
areas for all respective elevation bands which are covered by glaciers
(i.e. glacier water equivalent ≥ 0) for each elevation zone."
"""
# Update elevation zone areas
for i, hydro_unit_id in enumerate(np.unique(self.hydro_unit_ids)):
indices = np.where(self.hydro_unit_ids == hydro_unit_id)[0]
if len(indices) != 0:
areas = self.areas_pc[:, indices] * self.catchment_area
self.lookup_table_area[:, i] = np.sum(areas, axis=1)
volumes = self.we[:, indices] * areas / (ICE_WE * 1000)
self.lookup_table_volume[:, i] = np.sum(volumes, axis=1)
else:
self.lookup_table_area[:, i] = 0
self.lookup_table_volume[:, i] = 0
@staticmethod
def _get_glacier_patches(
catchment: Catchment, glaciers_mask: np.ndarray
) -> list[tuple[int, float]]:
"""
Extract glacier patches for each hydro unit from a glacier mask.
Identifies contiguous glacier areas within each hydro unit and calculates
their areas based on DEM pixel size.
Parameters
----------
catchment
The catchment object with map_unit_ids and DEM information.
glaciers_mask
Binary mask where 1 indicates glacier presence.
Returns
-------
list[tuple[int, float]]
List of (hydro_unit_id, glacier_area) tuples.
"""
# Extract the pixel size
px_area = catchment.get_dem_pixel_area()
glacier_patches = []
# Get the hydro unit ids for the glacier mask
unit_ids = np.unique(catchment.map_unit_ids[glaciers_mask > 0])
unit_ids = unit_ids[unit_ids != 0]
for unit_id in unit_ids:
mask_unit_id = catchment.map_unit_ids[glaciers_mask > 0] == unit_id
area = np.count_nonzero(mask_unit_id) * px_area
glacier_patches.append((unit_id, area))
return glacier_patches