from __future__ import annotations
import logging
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from hydrobricks import gpd
from hydrobricks._exceptions import ConfigurationError, DataError, DependencyError
from hydrobricks._hydrobricks import ActionLandCoverChange as _ActionLandCoverChange
from hydrobricks._optional import HAS_GEOPANDAS, HAS_RASTERIO, HAS_SHAPELY
from hydrobricks._units import Unit, convert_unit
from hydrobricks._utils import compute_area, date_as_mjd
from hydrobricks.actions import Action
from hydrobricks.catchment import Catchment
from hydrobricks.hydro_units import HydroUnits
logger = logging.getLogger(__name__)
if HAS_SHAPELY:
from shapely.geometry import MultiPolygon, mapping
from shapely.ops import unary_union
if HAS_RASTERIO:
from rasterio.mask import mask
m2 = Unit.M2
km2 = Unit.KM2
[docs]
class ActionLandCoverChange(Action):
"""
Class for managing land cover changes over time.
This action tracks changes in land cover areas (e.g., glacier retreat) across
hydro units at specified dates.
"""
def __init__(self) -> None:
"""
Initialize ActionLandCoverChange instance.
This action manages time-dependent land cover changes, updating land cover
areas at specified dates throughout the simulation period.
"""
super().__init__()
self.name: str = "ActionLandCoverChange"
self.action: _ActionLandCoverChange = _ActionLandCoverChange()
[docs]
def load_from_csv(
self,
path: str | Path,
hydro_units: HydroUnits,
land_cover: str,
area_unit: str,
match_with: str = "elevation",
) -> None:
"""
Read land cover changes from a CSV file.
The file should contain changes for a single land cover. Multiple files can be
loaded consecutively. The first column must contain information to identify the
hydro unit ID, such as the ID or elevation. Subsequent columns contain the
changes at different dates. The first line must contain dates in
YYYY-MM-DD format.
Parameters
----------
path
Path to the CSV file containing land cover change data.
hydro_units
The HydroUnits instance to match land cover changes against.
land_cover
Name of the land cover to change (e.g., 'glacier', 'forest').
area_unit
Unit for the area values: 'm2' or 'km2'.
match_with
Information used to identify hydro units. Options: 'elevation', 'id'.
Default: 'elevation'
Raises
------
FileNotFoundError
If the CSV file does not exist.
ValueError
If the first column values don't match hydro unit IDs or match criteria.
KeyError
If required columns are missing from the CSV file.
Examples
--------
>>> action = ActionLandCoverChange()
>>> action.load_from_csv(
... 'glacier_changes.csv',
... hydro_units,
... land_cover='glacier',
... area_unit='km2',
... match_with='elevation'
... )
Example CSV file format (with areas in km²)
-------------------------------------------
elevation 2020-08-01 2025-08-01 2030-08-01 2035-08-01 2040-08-01
4274 0.013 0.003 0 0 0
4310 0.019 0.009 0 0 0
4346 0.052 0.042 0.032 0.022 0.012
4382 0.072 0.062 0.052 0.042 0.032
4418 0.129 0.119 0.109 0.099 0.089
4454 0.252 0.242 0.232 0.222 0.212
4490 0.288 0.278 0.268 0.258 0.248
4526 0.341 0.331 0.321 0.311 0.301
4562 0.613 0.603 0.593 0.583 0.573
"""
file_content = pd.read_csv(path)
if match_with == "id":
# Check that the first column contains hydro unit ids
col_1 = file_content.iloc[:, 0]
id_min = hydro_units.hydro_units[("id", "-")].min()
id_max = hydro_units.hydro_units[("id", "-")].max()
if col_1.min() < id_min or col_1.max() > id_max:
raise DataError(
"The first column of the file does not contain the hydro unit ids.",
data_type="hydro unit",
reason="Invalid hydro unit IDs",
)
# Set the first column name to 'hydro_unit'
file_content.rename(
columns={file_content.columns[0]: "hydro_unit"}, inplace=True
)
else:
file_content.insert(loc=0, column="hydro_unit", value=0)
self._match_hydro_unit_ids(file_content, hydro_units, match_with)
# Drop 2nd column (e.g., elevation)
file_content.drop(file_content.columns[1], axis=1, inplace=True)
self._remove_rows_with_no_changes(file_content)
self._populate_bounded_instance(land_cover, area_unit, file_content)
[docs]
def get_change_count(self) -> int:
"""
Get the number of changes registered.
Returns
-------
int
Total number of land cover changes across all times and hydro units.
"""
return self.action.get_change_count()
[docs]
def get_land_cover_count(self) -> int:
"""
Get the number of land covers registered.
Returns
-------
int
Total number of distinct land cover types with registered changes.
"""
return self.action.get_land_cover_count()
[docs]
@classmethod
def create_action_for_glaciers(
cls,
catchment: Catchment,
times: list[str],
full_glaciers: list[str] | Path,
debris_glaciers: list[str] | Path | None = None,
with_debris: bool = False,
method: str = "vector",
interpolate_yearly: bool = True,
) -> tuple[ActionLandCoverChange, list[pd.DataFrame]]:
"""
Extract glacier cover changes from shapefiles and create an
ActionLandCoverChange object.
This method processes glacier shapefiles at multiple time points, computes
area changes for each hydro unit, and returns a configured action object.
Parameters
----------
catchment
The catchment to extract glacier cover changes for.
times
List of dates in format 'YYYY-MM-DD' corresponding to glacier extent data.
full_glaciers
Path(s) to shapefile(s) containing the extent of all glaciers
(debris-covered and clean ice together). Can be a list of paths (one per
time) or a single path with placeholders.
debris_glaciers
Path(s) to shapefile(s) containing the extent of debris-covered glaciers.
Required if with_debris is True. Default: None
with_debris
If True, distinguishes between debris-covered and clean-ice areas.
If False, treats all glacier area uniformly. Default: False
method
Method to extract glacier cover changes:
- 'vector': Vectorial extraction (more precise but slower)
- 'raster': Raster extraction (faster but less precise)
Default: 'vector'
interpolate_yearly
If True, interpolate changes to yearly time steps between provided dates.
If False, use only the provided time points. Default: True
Returns
-------
tuple[ActionLandCoverChange, list[pd.DataFrame]]
A tuple containing:
- ActionLandCoverChange object configured with extracted cover areas
- List of DataFrames with cover areas. If with_debris is True:
[glacier_ice, glacier_debris, ground]. If False: [glacier, ground].
Raises
------
ImportError
If geopandas is not installed.
ValueError
If catchment.map_unit_ids is None or if data is inconsistent.
FileNotFoundError
If shapefile paths do not exist.
Examples
--------
>>> action, dataframes = ActionLandCoverChange.create_action_for_glaciers(
... catchment=my_catchment,
... times=['2020-08-01', '2030-08-01', '2040-08-01'],
... full_glaciers='glacier_{year}.shp',
... method='vector',
... interpolate_yearly=True
... )
"""
if not HAS_GEOPANDAS:
raise DependencyError(
"geopandas is required for land cover change operations.",
package_name="geopandas",
operation="ActionLandCoverChange.create_action_for_glaciers",
install_command="pip install geopandas",
)
if catchment.map_unit_ids is None:
raise ConfigurationError(
"The catchment has not been discretized (unit ids map missing).",
reason="Catchment not initialized",
)
if catchment.hydro_units is None:
raise ConfigurationError(
"The catchment has not been discretized (hydro units missing).",
reason="Catchment not initialized",
)
changes = ActionLandCoverChange()
changes_df = changes._create_action_for_glaciers(
catchment,
full_glaciers,
debris_glaciers,
times,
with_debris,
method,
interpolate_yearly,
)
return changes, changes_df
def _create_action_for_glaciers(
self,
catchment: Catchment,
full_glaciers: list[str] | Path,
debris_glaciers: list[str] | Path | None,
times: list[str],
with_debris: bool,
method: str,
interpolate_yearly: bool,
) -> list[pd.DataFrame]:
"""
Internal method to create glacier action and extract cover changes.
Parameters
----------
catchment
The catchment to extract glacier cover changes for.
full_glaciers
List of paths to glacier shapefiles (one per time point).
debris_glaciers
List of paths to debris glacier shapefiles or None if with_debris is False.
times
List of dates in format 'YYYY-MM-DD' corresponding to glacier extent data.
with_debris
If True, distinguishes between debris-covered and clean-ice areas.
method
Method to extract glacier cover changes ('vector' or 'raster').
interpolate_yearly
If True, interpolate changes to yearly time steps between provided dates.
Returns
-------
list[pd.DataFrame]
List of DataFrames with cover areas. If with_debris is True:
[glacier_ice, glacier_debris, ground]. If False: [glacier, ground].
Raises
------
ValueError
If input data is inconsistent or mismatched.
"""
if len(full_glaciers) != len(times):
raise DataError(
"The number of shapefiles and dates must be equal.",
data_type="glacier data",
reason="Mismatched array sizes",
)
if with_debris and debris_glaciers is None:
raise DataError(
"The debris shapefiles must be provided.",
data_type="glacier data",
reason="Missing required data",
)
if with_debris and len(debris_glaciers) != len(times):
raise DataError(
"The number of shapefiles and dates must be equal.",
data_type="glacier data",
reason="Mismatched array sizes",
)
if not with_debris and debris_glaciers is None:
debris_glaciers = [None] * len(times)
# Get the hydro units
n_unit_ids = catchment.get_hydro_unit_count()
hydro_units = catchment.hydro_units
# Create the dataframes
glacier_df = self._create_new_change_dataframe(hydro_units, n_unit_ids)
ice_df = self._create_new_change_dataframe(hydro_units, n_unit_ids)
debris_df = self._create_new_change_dataframe(hydro_units, n_unit_ids)
ground_df = self._create_new_change_dataframe(hydro_units, n_unit_ids)
# Parse the files
glacier_np = np.zeros((n_unit_ids, len(times)))
ice_np = np.zeros((n_unit_ids, len(times)))
debris_np = np.zeros((n_unit_ids, len(times)))
ground_np = np.zeros((n_unit_ids, len(times)))
for glacier_shp, debris_shp, time in zip(full_glaciers, debris_glaciers, times):
logger.debug(f"Extracting glacier cover changes for {time}...")
glacier, ice, debris, other = self._extract_glacier_cover_change(
catchment, glacier_shp, debris_shp, method=method
)
glacier_np[:, times.index(time)] = glacier
ice_np[:, times.index(time)] = ice
debris_np[:, times.index(time)] = debris
ground_np[:, times.index(time)] = other
# Interpolate the data to yearly time steps
times_full = pd.to_datetime(times)
if interpolate_yearly:
if with_debris:
ice_np, _ = self._interpolate_yearly(ice_np, times)
debris_np, _ = self._interpolate_yearly(debris_np, times)
else:
glacier_np, _ = self._interpolate_yearly(glacier_np, times)
ground_np, times_full = self._interpolate_yearly(ground_np, times)
# Add the columns to the dataframes
times_full_str = [time.strftime("%Y-%m-%d") for time in times_full]
empty_df = pd.DataFrame(columns=times_full_str)
if with_debris:
ice_df = pd.concat([ice_df, empty_df.copy()], axis=1)
debris_df = pd.concat([debris_df, empty_df.copy()], axis=1)
else:
glacier_df = pd.concat([glacier_df, empty_df.copy()], axis=1)
ground_df = pd.concat([ground_df, empty_df.copy()], axis=1)
# Set data to the dataframes
if with_debris:
ice_df.iloc[:, 1:] = ice_np
debris_df.iloc[:, 1:] = debris_np
else:
glacier_df.iloc[:, 1:] = glacier_np
ground_df.iloc[:, 1:] = ground_np
# Populate the bounded instance (not needed for the ground type)
if with_debris:
self._remove_rows_with_no_changes(ice_df)
self._populate_bounded_instance("glacier_ice", "m2", ice_df)
self._remove_rows_with_no_changes(debris_df)
self._populate_bounded_instance("glacier_debris", "m2", debris_df)
else:
self._remove_rows_with_no_changes(glacier_df)
self._populate_bounded_instance("glacier", "m2", glacier_df)
if with_debris:
return [ice_df, debris_df, ground_df]
else:
return [glacier_df, ground_df]
def _extract_glacier_cover_change(
self,
catchment: Catchment,
glaciers_shapefile: str | Path,
debris_shapefile: str | Path | None,
method: str = "vector",
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Extract the glacier cover changes from shapefiles.
This method clips glacier shapefiles to the catchment extent and computes
the area of glaciers (and optionally debris-covered ice) for each hydro unit
using either vector-based or raster-based methods.
Parameters
----------
catchment
The catchment to extract glacier cover changes for.
glaciers_shapefile
Path to the shapefile containing the extent of all glaciers
(debris-covered and clean ice together).
debris_shapefile
Path to the shapefile containing the extent of debris-covered glaciers only.
If None, no debris distinction is made.
method
The method to extract the glacier cover changes:
- 'vector': Vectorial extraction (more precise but slower)
- 'raster': Raster extraction (faster but less precise)
Default: 'vector'
Returns
-------
tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
A tuple containing four 1D arrays (one per hydro unit):
- glacier_area: Total glacier area (m²) for each hydro unit
- bare_ice_area: Clean-ice area (m²) for each hydro unit
- debris_area: Debris-covered ice area (m²) for each hydro unit
- other_area: Non-glaciated area (m²) for each hydro unit
Raises
------
FileNotFoundError
If shapefile paths do not exist.
ValueError
If catchment outline geometry is not a (multi)polygon or method is invalid.
"""
if method not in ["vector", "raster"]:
raise ConfigurationError(
"Unknown method. Choose either 'vector' or 'raster'.",
item_name="method",
item_value=method,
reason="Invalid method value",
)
# Clip the glaciers to the catchment extent
all_glaciers = gpd.read_file(glaciers_shapefile)
all_glaciers.to_crs(catchment.crs, inplace=True)
if catchment.outline[0].geom_type == "MultiPolygon":
glaciers = gpd.clip(all_glaciers, catchment.outline[0])
elif catchment.outline[0].geom_type == "Polygon":
glaciers = gpd.clip(all_glaciers, MultiPolygon(catchment.outline))
else:
raise DataError(
"The catchment outline must be a (multi)polygon.",
data_type="catchment outline",
reason="Invalid geometry type",
)
glaciers = self._simplify_df_geometries(glaciers)
# Compute the glaciated area of the catchment
glaciated_area = compute_area(glaciers)
non_glaciated_area = catchment.area - glaciated_area
# Compute the debris-covered area of the glacier
glaciers_debris = None
if debris_shapefile is not None:
all_debris_glaciers = gpd.read_file(debris_shapefile)
all_debris_glaciers.to_crs(catchment.crs, inplace=True)
glaciers_debris = gpd.clip(all_debris_glaciers, glaciers)
glaciers_debris = self._simplify_df_geometries(glaciers_debris)
# Display some glacier statistics
logger.debug(
f"The catchment has an area of "
f"{convert_unit(catchment.area, m2, km2):.1f} km², from which "
f"{convert_unit(glaciated_area, m2, km2):.1f} km² are glaciated, "
f"and {convert_unit(non_glaciated_area, m2, km2):.1f} km² are "
f"non glaciated."
)
logger.debug(
f"The catchment is {glaciated_area / catchment.area * 100:.1f}% "
f"glaciated."
)
if debris_shapefile is not None:
debris_glaciated_area = compute_area(glaciers_debris)
bare_ice_area = glaciated_area - debris_glaciated_area
bare_ice_percentage = bare_ice_area / glaciated_area * 100
logger.debug(
f"The glaciers have {convert_unit(bare_ice_area, m2, km2):.1f} km² "
f"of bare ice, and {convert_unit(debris_glaciated_area, m2, km2):.1f}"
f" km² of debris-covered ice, thus amounting to "
f"{bare_ice_percentage:.1f}% of bare ice."
)
# Extract the pixel size
x_size = catchment.get_dem_x_resolution()
y_size = catchment.get_dem_y_resolution()
px_area = catchment.get_dem_pixel_area()
# Define the method to extract the pixels touching the glaciers
if method == "vector":
all_touched = True # Needs to be True to include partly-covered pixels
else:
all_touched = False
logger.debug(
f"The dataset in the CRS {glaciers.crs} has a spatial resolution of "
f"{x_size} m, {y_size} m thus giving pixel areas "
f"of {px_area} m²."
)
# Get the glacier mask
glaciers_mask = self._mask_dem(
catchment, glaciers, nodata=0, all_touched=all_touched
)
debris_mask = None
if debris_shapefile is not None:
debris_mask = self._mask_dem(
catchment, glaciers_debris, nodata=0, all_touched=all_touched
)
unit_ids = catchment.hydro_units.hydro_units[("id", "-")].values
glacier_area = np.zeros(len(unit_ids))
bare_ice_area = np.zeros(len(unit_ids))
debris_area = np.zeros(len(unit_ids))
other_area = np.zeros(len(unit_ids))
for idx, unit_id in enumerate(unit_ids):
mask_unit = catchment.map_unit_ids == unit_id
unit_area = np.sum(mask_unit) * px_area
if method == "vector":
warnings.filterwarnings(
"ignore",
category=RuntimeWarning,
message="invalid value encountered in intersection",
)
# Create an empty list to store the pixel geometries
pixels_geoms = []
# Iterate through the rows and columns of the raster
for i in range(catchment.dem.height):
for j in range(catchment.dem.width):
# Check if there is a glacier pixel
if glaciers_mask[i, j] == 0:
continue
# Check if the pixel value matches the target value
if catchment.map_unit_ids[i, j] != unit_id:
continue
# Create a polygon for the pixel
pixel_geo = catchment.create_dem_pixel_geometry(i, j)
pixels_geoms.append(pixel_geo)
unit_geoms = unary_union(pixels_geoms)
# Iterate through glacier polygons and find intersections
inters_glaciers = []
if len(pixels_geoms) > 0:
self._get_intersections(unit_geoms, glaciers, inters_glaciers)
# Iterate through debris polygons and find intersections
inters_debris = []
if len(pixels_geoms) > 0 and debris_shapefile is not None:
self._get_intersections(unit_geoms, glaciers_debris, inters_debris)
warnings.resetwarnings()
glacier_area[idx] = self._compute_intersection_area(inters_glaciers)
debris_area[idx] = self._compute_intersection_area(inters_debris)
bare_ice_area[idx] = glacier_area[idx] - debris_area[idx]
elif method == "raster":
glacier_area[idx] = np.count_nonzero(glaciers_mask[mask_unit]) * px_area
if debris_shapefile is not None:
debris_area[idx] = (
np.count_nonzero(debris_mask[mask_unit]) * px_area
)
bare_ice_area[idx] = glacier_area[idx] - debris_area[idx]
other_area[idx] = unit_area - glacier_area[idx]
logger.debug(
f"After shapefile extraction (method: {method}), the glaciers have "
f"{convert_unit(np.sum(bare_ice_area), m2, km2):.1f} km² of bare ice, "
f"{convert_unit(np.sum(debris_area), m2, km2):.1f} km² of "
f"debris-covered ice, and "
f"{convert_unit(np.sum(other_area), m2, km2):.1f} km² of "
f"non-glaciated area."
)
return glacier_area, bare_ice_area, debris_area, other_area
@staticmethod
def _interpolate_yearly(
data: np.ndarray, times: list[str]
) -> tuple[np.ndarray, pd.DatetimeIndex]:
"""
Interpolate glacier cover changes to yearly time steps.
Parameters
----------
data
2D array of cover areas with shape (n_units, n_times).
times
List of date strings in format 'YYYY-MM-DD' corresponding to data columns.
Returns
-------
tuple[np.ndarray, pd.DatetimeIndex]
A tuple containing:
- Interpolated data array with yearly time steps
- DatetimeIndex of yearly dates from first to last time point
"""
times_dt = pd.to_datetime(times)
times_full = pd.date_range(times_dt[0], times_dt[-1], freq="YS")
# Create an array with the interpolated values
data_full = np.zeros((data.shape[0], len(times_full)))
for idx, _ in enumerate(data):
data_full[idx, :] = np.interp(times_full, times_dt, data[idx, :])
return data_full, times_full
@staticmethod
def _create_new_change_dataframe(
hydro_units: HydroUnits, n_unit_ids: int
) -> pd.DataFrame:
"""
Create a new DataFrame for storing land cover changes.
Parameters
----------
hydro_units
HydroUnits object containing hydro unit IDs.
n_unit_ids
Number of hydro units.
Returns
-------
pd.DataFrame
DataFrame with hydro unit IDs in the first column, ready for adding
land cover changes at different time steps.
"""
changes_df = pd.DataFrame(index=range(n_unit_ids))
changes_df.insert(loc=0, column="hydro_unit", value=0)
ids = hydro_units.hydro_units[("id", "-")].values.squeeze()
changes_df.loc[:, "hydro_unit"] = ids
return changes_df
@staticmethod
def _compute_intersection_area(intersecting_geoms: list[MultiPolygon]) -> float:
"""
Compute the total area of intersecting geometries.
Parameters
----------
intersecting_geoms
List of MultiPolygon geometries representing intersections.
Returns
-------
float
Total area (in m²) of all intersecting geometries merged together.
"""
if len(intersecting_geoms) == 0:
return 0
# Create a single geometry from all intersecting geometries
merged_geometry = unary_union(intersecting_geoms)
return merged_geometry.area
@staticmethod
def _get_intersections(
pixel_geo: MultiPolygon,
objects: gpd.GeoDataFrame,
intersecting_geoms: list[MultiPolygon],
) -> None:
"""
Find intersections between pixel geometries and GeoDataFrame objects.
Parameters
----------
pixel_geo
The pixel geometry (MultiPolygon) to test for intersections.
objects
GeoDataFrame containing geometries to intersect with the pixel.
intersecting_geoms
List to append intersecting geometries to (modified in place).
Notes
-----
This method modifies the intersecting_geoms list by appending non-empty
intersection geometries found between pixel_geo and each object geometry.
"""
for _, obj in objects.iterrows():
intersection = pixel_geo.intersection(obj["geometry"])
if not intersection.is_empty:
intersecting_geoms.append(intersection)
@staticmethod
def _simplify_df_geometries(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Simplify and merge geometries in a GeoDataFrame.
Merges all geometries in the GeoDataFrame into a single geometry using
spatial union, then drops all columns except the geometry column.
Parameters
----------
df
GeoDataFrame with geometries to merge.
Returns
-------
gpd.GeoDataFrame
Simplified GeoDataFrame with a single row containing the unioned geometry.
"""
df["new_col"] = 0
df = df.dissolve(by="new_col", as_index=False)
# Drop all columns except the geometry
df = df[["geometry"]]
return df
@staticmethod
def _mask_dem(
catchment: Catchment,
shapefile: gpd.GeoDataFrame,
nodata: float,
all_touched: bool = False,
) -> np.ndarray:
"""
Create a raster mask from shapefile geometries.
Parameters
----------
catchment
The catchment whose DEM extent and resolution are used.
shapefile
GeoDataFrame containing geometries to rasterize.
nodata
Value to assign to cells outside the shapefile geometries.
all_touched
If True, all touched pixels are included. If False, only pixels
whose center is within the geometry are included. Default: False
Returns
-------
np.ndarray
2D binary mask array with same dimensions as DEM, where
geometries are marked with 1 and non-geometry areas with nodata value.
"""
geoms = []
for geo in shapefile.geometry.values:
geoms.append(mapping(geo))
dem_masked, _ = mask(catchment.dem, geoms, crop=False, all_touched=all_touched)
dem_masked[dem_masked == catchment.dem.nodata] = nodata
if len(dem_masked.shape) == 3:
dem_masked = dem_masked[0]
return dem_masked
@staticmethod
def _match_hydro_unit_ids(
file_content: pd.DataFrame, hydro_units: HydroUnits, match_with: str
) -> None:
"""
Match hydro unit identifiers from a DataFrame to hydro unit IDs.
Parameters
----------
file_content
DataFrame with matching criteria (e.g., elevation) in the second column.
hydro_units
HydroUnits object containing hydro unit definitions.
match_with
Criteria for matching: 'elevation' or 'id'.
Raises
------
ValueError
If the match_with criterion is not recognized or if matches are not found.
Notes
-----
This method modifies file_content in place, adding the matched hydro unit ID
to the 'hydro_unit' column.
"""
hu_df = hydro_units.hydro_units
for row, change in file_content.iterrows():
if match_with == "elevation":
elevation_values = hu_df[("elevation", "m")].to_numpy(dtype=np.int64)
value = int(change.iloc[1])
idx_id = hu_df.index[elevation_values == value].to_list()[0]
else:
raise ConfigurationError(
f'No option "{match_with}" for "match_with".',
item_name="match_with",
item_value=match_with,
reason="Invalid option",
)
file_content.at[row, "hydro_unit"] = hu_df.at[idx_id, ("id", "-")]
@staticmethod
def _remove_rows_with_no_changes(file_content: pd.DataFrame) -> None:
"""
Mark rows with no changes as NaN in the DataFrame.
Identifies rows where the land cover area doesn't change between consecutive
dates and marks these unchanged values as NaN.
Parameters
----------
file_content
DataFrame with hydro unit IDs in the first column and areas at different
dates in subsequent columns.
Notes
-----
This method modifies file_content in place, replacing unchanged values with NaN.
"""
for idx, (_, change) in enumerate(file_content.iterrows()):
diff = change.to_numpy(dtype=float)[1:]
diff = diff[0:-1] - diff[1:]
for i_diff, v_diff in enumerate(diff):
if v_diff == 0:
file_content.iloc[idx, i_diff + 2] = np.nan
def _populate_bounded_instance(
self, land_cover: str, area_unit: str, file_content: pd.DataFrame
) -> None:
"""
Populate the internal C++ action instance with land cover change data.
Parameters
----------
land_cover
Name of the land cover to apply changes to.
area_unit
Unit of the area values: 'm2' or 'km2'.
file_content
DataFrame with hydro unit IDs and land cover areas at different dates.
Notes
-----
This method converts all area units to m² before populating the C++ instance
and sets is_initialized to True upon completion.
"""
for col in list(file_content):
if col == "hydro_unit" or col == 0:
continue
# Extract date from column name
mjd = date_as_mjd(col)
for row in range(len(file_content[col])):
hu_id = file_content.loc[row, "hydro_unit"]
area = float(file_content.loc[row, col])
if not np.isnan(area):
area = convert_unit(area, area_unit, Unit.M2)
self.action.add_change(mjd, hu_id, land_cover, area)
self.is_initialized = True