Source code for hydrobricks.actions.action_land_cover_change

from __future__ import annotations

import logging
import warnings
from pathlib import Path

import numpy as np
import pandas as pd

from hydrobricks import gpd
from hydrobricks._exceptions import ConfigurationError, DataError, DependencyError
from hydrobricks._hydrobricks import ActionLandCoverChange as _ActionLandCoverChange
from hydrobricks._optional import HAS_GEOPANDAS, HAS_RASTERIO, HAS_SHAPELY
from hydrobricks._units import Unit, convert_unit
from hydrobricks._utils import compute_area, date_as_mjd
from hydrobricks.actions import Action
from hydrobricks.catchment import Catchment
from hydrobricks.hydro_units import HydroUnits

logger = logging.getLogger(__name__)

if HAS_SHAPELY:
    from shapely.geometry import MultiPolygon, mapping
    from shapely.ops import unary_union

if HAS_RASTERIO:
    from rasterio.mask import mask

m2 = Unit.M2
km2 = Unit.KM2


[docs] class ActionLandCoverChange(Action): """ Class for managing land cover changes over time. This action tracks changes in land cover areas (e.g., glacier retreat) across hydro units at specified dates. """ def __init__(self) -> None: """ Initialize ActionLandCoverChange instance. This action manages time-dependent land cover changes, updating land cover areas at specified dates throughout the simulation period. """ super().__init__() self.name: str = "ActionLandCoverChange" self.action: _ActionLandCoverChange = _ActionLandCoverChange()
[docs] def load_from_csv( self, path: str | Path, hydro_units: HydroUnits, land_cover: str, area_unit: str, match_with: str = "elevation", ) -> None: """ Read land cover changes from a CSV file. The file should contain changes for a single land cover. Multiple files can be loaded consecutively. The first column must contain information to identify the hydro unit ID, such as the ID or elevation. Subsequent columns contain the changes at different dates. The first line must contain dates in YYYY-MM-DD format. Parameters ---------- path Path to the CSV file containing land cover change data. hydro_units The HydroUnits instance to match land cover changes against. land_cover Name of the land cover to change (e.g., 'glacier', 'forest'). area_unit Unit for the area values: 'm2' or 'km2'. match_with Information used to identify hydro units. Options: 'elevation', 'id'. Default: 'elevation' Raises ------ FileNotFoundError If the CSV file does not exist. ValueError If the first column values don't match hydro unit IDs or match criteria. KeyError If required columns are missing from the CSV file. Examples -------- >>> action = ActionLandCoverChange() >>> action.load_from_csv( ... 'glacier_changes.csv', ... hydro_units, ... land_cover='glacier', ... area_unit='km2', ... match_with='elevation' ... ) Example CSV file format (with areas in km²) ------------------------------------------- elevation 2020-08-01 2025-08-01 2030-08-01 2035-08-01 2040-08-01 4274 0.013 0.003 0 0 0 4310 0.019 0.009 0 0 0 4346 0.052 0.042 0.032 0.022 0.012 4382 0.072 0.062 0.052 0.042 0.032 4418 0.129 0.119 0.109 0.099 0.089 4454 0.252 0.242 0.232 0.222 0.212 4490 0.288 0.278 0.268 0.258 0.248 4526 0.341 0.331 0.321 0.311 0.301 4562 0.613 0.603 0.593 0.583 0.573 """ file_content = pd.read_csv(path) if match_with == "id": # Check that the first column contains hydro unit ids col_1 = file_content.iloc[:, 0] id_min = hydro_units.hydro_units[("id", "-")].min() id_max = hydro_units.hydro_units[("id", "-")].max() if col_1.min() < id_min or col_1.max() > id_max: raise DataError( "The first column of the file does not contain the hydro unit ids.", data_type="hydro unit", reason="Invalid hydro unit IDs", ) # Set the first column name to 'hydro_unit' file_content.rename( columns={file_content.columns[0]: "hydro_unit"}, inplace=True ) else: file_content.insert(loc=0, column="hydro_unit", value=0) self._match_hydro_unit_ids(file_content, hydro_units, match_with) # Drop 2nd column (e.g., elevation) file_content.drop(file_content.columns[1], axis=1, inplace=True) self._remove_rows_with_no_changes(file_content) self._populate_bounded_instance(land_cover, area_unit, file_content)
[docs] def get_change_count(self) -> int: """ Get the number of changes registered. Returns ------- int Total number of land cover changes across all times and hydro units. """ return self.action.get_change_count()
[docs] def get_land_cover_count(self) -> int: """ Get the number of land covers registered. Returns ------- int Total number of distinct land cover types with registered changes. """ return self.action.get_land_cover_count()
[docs] @classmethod def create_action_for_glaciers( cls, catchment: Catchment, times: list[str], full_glaciers: list[str] | Path, debris_glaciers: list[str] | Path | None = None, with_debris: bool = False, method: str = "vector", interpolate_yearly: bool = True, ) -> tuple[ActionLandCoverChange, list[pd.DataFrame]]: """ Extract glacier cover changes from shapefiles and create an ActionLandCoverChange object. This method processes glacier shapefiles at multiple time points, computes area changes for each hydro unit, and returns a configured action object. Parameters ---------- catchment The catchment to extract glacier cover changes for. times List of dates in format 'YYYY-MM-DD' corresponding to glacier extent data. full_glaciers Path(s) to shapefile(s) containing the extent of all glaciers (debris-covered and clean ice together). Can be a list of paths (one per time) or a single path with placeholders. debris_glaciers Path(s) to shapefile(s) containing the extent of debris-covered glaciers. Required if with_debris is True. Default: None with_debris If True, distinguishes between debris-covered and clean-ice areas. If False, treats all glacier area uniformly. Default: False method Method to extract glacier cover changes: - 'vector': Vectorial extraction (more precise but slower) - 'raster': Raster extraction (faster but less precise) Default: 'vector' interpolate_yearly If True, interpolate changes to yearly time steps between provided dates. If False, use only the provided time points. Default: True Returns ------- tuple[ActionLandCoverChange, list[pd.DataFrame]] A tuple containing: - ActionLandCoverChange object configured with extracted cover areas - List of DataFrames with cover areas. If with_debris is True: [glacier_ice, glacier_debris, ground]. If False: [glacier, ground]. Raises ------ ImportError If geopandas is not installed. ValueError If catchment.map_unit_ids is None or if data is inconsistent. FileNotFoundError If shapefile paths do not exist. Examples -------- >>> action, dataframes = ActionLandCoverChange.create_action_for_glaciers( ... catchment=my_catchment, ... times=['2020-08-01', '2030-08-01', '2040-08-01'], ... full_glaciers='glacier_{year}.shp', ... method='vector', ... interpolate_yearly=True ... ) """ if not HAS_GEOPANDAS: raise DependencyError( "geopandas is required for land cover change operations.", package_name="geopandas", operation="ActionLandCoverChange.create_action_for_glaciers", install_command="pip install geopandas", ) if catchment.map_unit_ids is None: raise ConfigurationError( "The catchment has not been discretized (unit ids map missing).", reason="Catchment not initialized", ) if catchment.hydro_units is None: raise ConfigurationError( "The catchment has not been discretized (hydro units missing).", reason="Catchment not initialized", ) changes = ActionLandCoverChange() changes_df = changes._create_action_for_glaciers( catchment, full_glaciers, debris_glaciers, times, with_debris, method, interpolate_yearly, ) return changes, changes_df
def _create_action_for_glaciers( self, catchment: Catchment, full_glaciers: list[str] | Path, debris_glaciers: list[str] | Path | None, times: list[str], with_debris: bool, method: str, interpolate_yearly: bool, ) -> list[pd.DataFrame]: """ Internal method to create glacier action and extract cover changes. Parameters ---------- catchment The catchment to extract glacier cover changes for. full_glaciers List of paths to glacier shapefiles (one per time point). debris_glaciers List of paths to debris glacier shapefiles or None if with_debris is False. times List of dates in format 'YYYY-MM-DD' corresponding to glacier extent data. with_debris If True, distinguishes between debris-covered and clean-ice areas. method Method to extract glacier cover changes ('vector' or 'raster'). interpolate_yearly If True, interpolate changes to yearly time steps between provided dates. Returns ------- list[pd.DataFrame] List of DataFrames with cover areas. If with_debris is True: [glacier_ice, glacier_debris, ground]. If False: [glacier, ground]. Raises ------ ValueError If input data is inconsistent or mismatched. """ if len(full_glaciers) != len(times): raise DataError( "The number of shapefiles and dates must be equal.", data_type="glacier data", reason="Mismatched array sizes", ) if with_debris and debris_glaciers is None: raise DataError( "The debris shapefiles must be provided.", data_type="glacier data", reason="Missing required data", ) if with_debris and len(debris_glaciers) != len(times): raise DataError( "The number of shapefiles and dates must be equal.", data_type="glacier data", reason="Mismatched array sizes", ) if not with_debris and debris_glaciers is None: debris_glaciers = [None] * len(times) # Get the hydro units n_unit_ids = catchment.get_hydro_unit_count() hydro_units = catchment.hydro_units # Create the dataframes glacier_df = self._create_new_change_dataframe(hydro_units, n_unit_ids) ice_df = self._create_new_change_dataframe(hydro_units, n_unit_ids) debris_df = self._create_new_change_dataframe(hydro_units, n_unit_ids) ground_df = self._create_new_change_dataframe(hydro_units, n_unit_ids) # Parse the files glacier_np = np.zeros((n_unit_ids, len(times))) ice_np = np.zeros((n_unit_ids, len(times))) debris_np = np.zeros((n_unit_ids, len(times))) ground_np = np.zeros((n_unit_ids, len(times))) for glacier_shp, debris_shp, time in zip(full_glaciers, debris_glaciers, times): logger.debug(f"Extracting glacier cover changes for {time}...") glacier, ice, debris, other = self._extract_glacier_cover_change( catchment, glacier_shp, debris_shp, method=method ) glacier_np[:, times.index(time)] = glacier ice_np[:, times.index(time)] = ice debris_np[:, times.index(time)] = debris ground_np[:, times.index(time)] = other # Interpolate the data to yearly time steps times_full = pd.to_datetime(times) if interpolate_yearly: if with_debris: ice_np, _ = self._interpolate_yearly(ice_np, times) debris_np, _ = self._interpolate_yearly(debris_np, times) else: glacier_np, _ = self._interpolate_yearly(glacier_np, times) ground_np, times_full = self._interpolate_yearly(ground_np, times) # Add the columns to the dataframes times_full_str = [time.strftime("%Y-%m-%d") for time in times_full] empty_df = pd.DataFrame(columns=times_full_str) if with_debris: ice_df = pd.concat([ice_df, empty_df.copy()], axis=1) debris_df = pd.concat([debris_df, empty_df.copy()], axis=1) else: glacier_df = pd.concat([glacier_df, empty_df.copy()], axis=1) ground_df = pd.concat([ground_df, empty_df.copy()], axis=1) # Set data to the dataframes if with_debris: ice_df.iloc[:, 1:] = ice_np debris_df.iloc[:, 1:] = debris_np else: glacier_df.iloc[:, 1:] = glacier_np ground_df.iloc[:, 1:] = ground_np # Populate the bounded instance (not needed for the ground type) if with_debris: self._remove_rows_with_no_changes(ice_df) self._populate_bounded_instance("glacier_ice", "m2", ice_df) self._remove_rows_with_no_changes(debris_df) self._populate_bounded_instance("glacier_debris", "m2", debris_df) else: self._remove_rows_with_no_changes(glacier_df) self._populate_bounded_instance("glacier", "m2", glacier_df) if with_debris: return [ice_df, debris_df, ground_df] else: return [glacier_df, ground_df] def _extract_glacier_cover_change( self, catchment: Catchment, glaciers_shapefile: str | Path, debris_shapefile: str | Path | None, method: str = "vector", ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Extract the glacier cover changes from shapefiles. This method clips glacier shapefiles to the catchment extent and computes the area of glaciers (and optionally debris-covered ice) for each hydro unit using either vector-based or raster-based methods. Parameters ---------- catchment The catchment to extract glacier cover changes for. glaciers_shapefile Path to the shapefile containing the extent of all glaciers (debris-covered and clean ice together). debris_shapefile Path to the shapefile containing the extent of debris-covered glaciers only. If None, no debris distinction is made. method The method to extract the glacier cover changes: - 'vector': Vectorial extraction (more precise but slower) - 'raster': Raster extraction (faster but less precise) Default: 'vector' Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] A tuple containing four 1D arrays (one per hydro unit): - glacier_area: Total glacier area (m²) for each hydro unit - bare_ice_area: Clean-ice area (m²) for each hydro unit - debris_area: Debris-covered ice area (m²) for each hydro unit - other_area: Non-glaciated area (m²) for each hydro unit Raises ------ FileNotFoundError If shapefile paths do not exist. ValueError If catchment outline geometry is not a (multi)polygon or method is invalid. """ if method not in ["vector", "raster"]: raise ConfigurationError( "Unknown method. Choose either 'vector' or 'raster'.", item_name="method", item_value=method, reason="Invalid method value", ) # Clip the glaciers to the catchment extent all_glaciers = gpd.read_file(glaciers_shapefile) all_glaciers.to_crs(catchment.crs, inplace=True) if catchment.outline[0].geom_type == "MultiPolygon": glaciers = gpd.clip(all_glaciers, catchment.outline[0]) elif catchment.outline[0].geom_type == "Polygon": glaciers = gpd.clip(all_glaciers, MultiPolygon(catchment.outline)) else: raise DataError( "The catchment outline must be a (multi)polygon.", data_type="catchment outline", reason="Invalid geometry type", ) glaciers = self._simplify_df_geometries(glaciers) # Compute the glaciated area of the catchment glaciated_area = compute_area(glaciers) non_glaciated_area = catchment.area - glaciated_area # Compute the debris-covered area of the glacier glaciers_debris = None if debris_shapefile is not None: all_debris_glaciers = gpd.read_file(debris_shapefile) all_debris_glaciers.to_crs(catchment.crs, inplace=True) glaciers_debris = gpd.clip(all_debris_glaciers, glaciers) glaciers_debris = self._simplify_df_geometries(glaciers_debris) # Display some glacier statistics logger.debug( f"The catchment has an area of " f"{convert_unit(catchment.area, m2, km2):.1f} km², from which " f"{convert_unit(glaciated_area, m2, km2):.1f} km² are glaciated, " f"and {convert_unit(non_glaciated_area, m2, km2):.1f} km² are " f"non glaciated." ) logger.debug( f"The catchment is {glaciated_area / catchment.area * 100:.1f}% " f"glaciated." ) if debris_shapefile is not None: debris_glaciated_area = compute_area(glaciers_debris) bare_ice_area = glaciated_area - debris_glaciated_area bare_ice_percentage = bare_ice_area / glaciated_area * 100 logger.debug( f"The glaciers have {convert_unit(bare_ice_area, m2, km2):.1f} km² " f"of bare ice, and {convert_unit(debris_glaciated_area, m2, km2):.1f}" f" km² of debris-covered ice, thus amounting to " f"{bare_ice_percentage:.1f}% of bare ice." ) # Extract the pixel size x_size = catchment.get_dem_x_resolution() y_size = catchment.get_dem_y_resolution() px_area = catchment.get_dem_pixel_area() # Define the method to extract the pixels touching the glaciers if method == "vector": all_touched = True # Needs to be True to include partly-covered pixels else: all_touched = False logger.debug( f"The dataset in the CRS {glaciers.crs} has a spatial resolution of " f"{x_size} m, {y_size} m thus giving pixel areas " f"of {px_area} m²." ) # Get the glacier mask glaciers_mask = self._mask_dem( catchment, glaciers, nodata=0, all_touched=all_touched ) debris_mask = None if debris_shapefile is not None: debris_mask = self._mask_dem( catchment, glaciers_debris, nodata=0, all_touched=all_touched ) unit_ids = catchment.hydro_units.hydro_units[("id", "-")].values glacier_area = np.zeros(len(unit_ids)) bare_ice_area = np.zeros(len(unit_ids)) debris_area = np.zeros(len(unit_ids)) other_area = np.zeros(len(unit_ids)) for idx, unit_id in enumerate(unit_ids): mask_unit = catchment.map_unit_ids == unit_id unit_area = np.sum(mask_unit) * px_area if method == "vector": warnings.filterwarnings( "ignore", category=RuntimeWarning, message="invalid value encountered in intersection", ) # Create an empty list to store the pixel geometries pixels_geoms = [] # Iterate through the rows and columns of the raster for i in range(catchment.dem.height): for j in range(catchment.dem.width): # Check if there is a glacier pixel if glaciers_mask[i, j] == 0: continue # Check if the pixel value matches the target value if catchment.map_unit_ids[i, j] != unit_id: continue # Create a polygon for the pixel pixel_geo = catchment.create_dem_pixel_geometry(i, j) pixels_geoms.append(pixel_geo) unit_geoms = unary_union(pixels_geoms) # Iterate through glacier polygons and find intersections inters_glaciers = [] if len(pixels_geoms) > 0: self._get_intersections(unit_geoms, glaciers, inters_glaciers) # Iterate through debris polygons and find intersections inters_debris = [] if len(pixels_geoms) > 0 and debris_shapefile is not None: self._get_intersections(unit_geoms, glaciers_debris, inters_debris) warnings.resetwarnings() glacier_area[idx] = self._compute_intersection_area(inters_glaciers) debris_area[idx] = self._compute_intersection_area(inters_debris) bare_ice_area[idx] = glacier_area[idx] - debris_area[idx] elif method == "raster": glacier_area[idx] = np.count_nonzero(glaciers_mask[mask_unit]) * px_area if debris_shapefile is not None: debris_area[idx] = ( np.count_nonzero(debris_mask[mask_unit]) * px_area ) bare_ice_area[idx] = glacier_area[idx] - debris_area[idx] other_area[idx] = unit_area - glacier_area[idx] logger.debug( f"After shapefile extraction (method: {method}), the glaciers have " f"{convert_unit(np.sum(bare_ice_area), m2, km2):.1f} km² of bare ice, " f"{convert_unit(np.sum(debris_area), m2, km2):.1f} km² of " f"debris-covered ice, and " f"{convert_unit(np.sum(other_area), m2, km2):.1f} km² of " f"non-glaciated area." ) return glacier_area, bare_ice_area, debris_area, other_area @staticmethod def _interpolate_yearly( data: np.ndarray, times: list[str] ) -> tuple[np.ndarray, pd.DatetimeIndex]: """ Interpolate glacier cover changes to yearly time steps. Parameters ---------- data 2D array of cover areas with shape (n_units, n_times). times List of date strings in format 'YYYY-MM-DD' corresponding to data columns. Returns ------- tuple[np.ndarray, pd.DatetimeIndex] A tuple containing: - Interpolated data array with yearly time steps - DatetimeIndex of yearly dates from first to last time point """ times_dt = pd.to_datetime(times) times_full = pd.date_range(times_dt[0], times_dt[-1], freq="YS") # Create an array with the interpolated values data_full = np.zeros((data.shape[0], len(times_full))) for idx, _ in enumerate(data): data_full[idx, :] = np.interp(times_full, times_dt, data[idx, :]) return data_full, times_full @staticmethod def _create_new_change_dataframe( hydro_units: HydroUnits, n_unit_ids: int ) -> pd.DataFrame: """ Create a new DataFrame for storing land cover changes. Parameters ---------- hydro_units HydroUnits object containing hydro unit IDs. n_unit_ids Number of hydro units. Returns ------- pd.DataFrame DataFrame with hydro unit IDs in the first column, ready for adding land cover changes at different time steps. """ changes_df = pd.DataFrame(index=range(n_unit_ids)) changes_df.insert(loc=0, column="hydro_unit", value=0) ids = hydro_units.hydro_units[("id", "-")].values.squeeze() changes_df.loc[:, "hydro_unit"] = ids return changes_df @staticmethod def _compute_intersection_area(intersecting_geoms: list[MultiPolygon]) -> float: """ Compute the total area of intersecting geometries. Parameters ---------- intersecting_geoms List of MultiPolygon geometries representing intersections. Returns ------- float Total area (in m²) of all intersecting geometries merged together. """ if len(intersecting_geoms) == 0: return 0 # Create a single geometry from all intersecting geometries merged_geometry = unary_union(intersecting_geoms) return merged_geometry.area @staticmethod def _get_intersections( pixel_geo: MultiPolygon, objects: gpd.GeoDataFrame, intersecting_geoms: list[MultiPolygon], ) -> None: """ Find intersections between pixel geometries and GeoDataFrame objects. Parameters ---------- pixel_geo The pixel geometry (MultiPolygon) to test for intersections. objects GeoDataFrame containing geometries to intersect with the pixel. intersecting_geoms List to append intersecting geometries to (modified in place). Notes ----- This method modifies the intersecting_geoms list by appending non-empty intersection geometries found between pixel_geo and each object geometry. """ for _, obj in objects.iterrows(): intersection = pixel_geo.intersection(obj["geometry"]) if not intersection.is_empty: intersecting_geoms.append(intersection) @staticmethod def _simplify_df_geometries(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Simplify and merge geometries in a GeoDataFrame. Merges all geometries in the GeoDataFrame into a single geometry using spatial union, then drops all columns except the geometry column. Parameters ---------- df GeoDataFrame with geometries to merge. Returns ------- gpd.GeoDataFrame Simplified GeoDataFrame with a single row containing the unioned geometry. """ df["new_col"] = 0 df = df.dissolve(by="new_col", as_index=False) # Drop all columns except the geometry df = df[["geometry"]] return df @staticmethod def _mask_dem( catchment: Catchment, shapefile: gpd.GeoDataFrame, nodata: float, all_touched: bool = False, ) -> np.ndarray: """ Create a raster mask from shapefile geometries. Parameters ---------- catchment The catchment whose DEM extent and resolution are used. shapefile GeoDataFrame containing geometries to rasterize. nodata Value to assign to cells outside the shapefile geometries. all_touched If True, all touched pixels are included. If False, only pixels whose center is within the geometry are included. Default: False Returns ------- np.ndarray 2D binary mask array with same dimensions as DEM, where geometries are marked with 1 and non-geometry areas with nodata value. """ geoms = [] for geo in shapefile.geometry.values: geoms.append(mapping(geo)) dem_masked, _ = mask(catchment.dem, geoms, crop=False, all_touched=all_touched) dem_masked[dem_masked == catchment.dem.nodata] = nodata if len(dem_masked.shape) == 3: dem_masked = dem_masked[0] return dem_masked @staticmethod def _match_hydro_unit_ids( file_content: pd.DataFrame, hydro_units: HydroUnits, match_with: str ) -> None: """ Match hydro unit identifiers from a DataFrame to hydro unit IDs. Parameters ---------- file_content DataFrame with matching criteria (e.g., elevation) in the second column. hydro_units HydroUnits object containing hydro unit definitions. match_with Criteria for matching: 'elevation' or 'id'. Raises ------ ValueError If the match_with criterion is not recognized or if matches are not found. Notes ----- This method modifies file_content in place, adding the matched hydro unit ID to the 'hydro_unit' column. """ hu_df = hydro_units.hydro_units for row, change in file_content.iterrows(): if match_with == "elevation": elevation_values = hu_df[("elevation", "m")].to_numpy(dtype=np.int64) value = int(change.iloc[1]) idx_id = hu_df.index[elevation_values == value].to_list()[0] else: raise ConfigurationError( f'No option "{match_with}" for "match_with".', item_name="match_with", item_value=match_with, reason="Invalid option", ) file_content.at[row, "hydro_unit"] = hu_df.at[idx_id, ("id", "-")] @staticmethod def _remove_rows_with_no_changes(file_content: pd.DataFrame) -> None: """ Mark rows with no changes as NaN in the DataFrame. Identifies rows where the land cover area doesn't change between consecutive dates and marks these unchanged values as NaN. Parameters ---------- file_content DataFrame with hydro unit IDs in the first column and areas at different dates in subsequent columns. Notes ----- This method modifies file_content in place, replacing unchanged values with NaN. """ for idx, (_, change) in enumerate(file_content.iterrows()): diff = change.to_numpy(dtype=float)[1:] diff = diff[0:-1] - diff[1:] for i_diff, v_diff in enumerate(diff): if v_diff == 0: file_content.iloc[idx, i_diff + 2] = np.nan def _populate_bounded_instance( self, land_cover: str, area_unit: str, file_content: pd.DataFrame ) -> None: """ Populate the internal C++ action instance with land cover change data. Parameters ---------- land_cover Name of the land cover to apply changes to. area_unit Unit of the area values: 'm2' or 'km2'. file_content DataFrame with hydro unit IDs and land cover areas at different dates. Notes ----- This method converts all area units to m² before populating the C++ instance and sets is_initialized to True upon completion. """ for col in list(file_content): if col == "hydro_unit" or col == 0: continue # Extract date from column name mjd = date_as_mjd(col) for row in range(len(file_content[col])): hu_id = file_content.loc[row, "hydro_unit"] area = float(file_content.loc[row, col]) if not np.isnan(area): area = convert_unit(area, area_unit, Unit.M2) self.action.add_change(mjd, hu_id, land_cover, area) self.is_initialized = True