from __future__ import annotations
import math
import warnings
from pathlib import Path
from typing import TYPE_CHECKING
import numpy as np
from hydrobricks import rasterio, rxr, xrs
from hydrobricks._exceptions import ConfigurationError, DependencyError
from hydrobricks._optional import HAS_PYARROW, HAS_RASTERIO, HAS_SHAPELY, HAS_XRSPATIAL
if TYPE_CHECKING:
from hydrobricks.catchment import Catchment
if HAS_RASTERIO:
from rasterio.enums import Resampling
from rasterio.mask import mask
if HAS_SHAPELY:
from shapely.geometry import mapping
[docs]
class CatchmentTopography:
"""
A class to represent the topography of a catchment.
"""
def __init__(self, catchment: Catchment) -> None:
"""
Initialize the Topography class.
Parameters
----------
catchment
The catchment object.
"""
self.catchment: Catchment = catchment
self.slope: np.ndarray | None = None
self.aspect: np.ndarray | None = None
[docs]
def get_mean_elevation(self) -> float:
"""
Get the catchment mean elevation.
Computes the mean elevation across all DEM cells within the catchment,
ignoring NaN values.
Returns
-------
float
The catchment mean elevation in meters.
"""
return np.nanmean(self.catchment.dem_data)
[docs]
def resample_dem_and_calculate_slope_aspect(
self, resolution: float, output_path: str | Path | None = None
) -> tuple[rasterio.DatasetReader, np.ndarray, np.ndarray, np.ndarray]:
"""
Resample the DEM and calculate the slope and aspect of the whole DEM.
Downsamples the DEM to a specified resolution using bilinear interpolation,
then computes slope and aspect for the resampled DEM. Results are saved to
disk and can be used for coarser resolution calculations.
Parameters
----------
resolution
Desired pixel resolution in meters. If None or matches current resolution,
uses the original DEM without resampling.
output_path
Path of the directory to save the downsampled DEM to. Required if
resolution differs from current DEM resolution.
Returns
-------
tuple[rasterio.DatasetReader, np.ndarray, np.ndarray, np.ndarray]
Tuple containing:
- Resampled DEM dataset
- Masked DEM data array
- Slope array (in degrees)
- Aspect array (in degrees, 0-360)
"""
if not HAS_RASTERIO:
raise DependencyError(
"rasterio is required for DEM resampling and processing.",
package_name="rasterio",
operation="CatchmentTopography.resample_dem_and_calculate_slope_aspect",
install_command="pip install rasterio",
)
if not HAS_PYARROW:
raise DependencyError(
"pyarrow is required for DEM resampling.",
package_name="pyarrow",
operation="CatchmentTopography.resample_dem_and_calculate_slope_aspect",
install_command="pip install pyarrow",
)
if not HAS_XRSPATIAL:
raise DependencyError(
"xarray-spatial is required for slope and aspect calculations.",
package_name="xarray-spatial",
operation="CatchmentTopography.resample_dem_and_calculate_slope_aspect",
install_command="pip install xarray-spatial",
)
# Only resample the DEM if the resolution is different from the original
if resolution is None or resolution == self.catchment.get_dem_x_resolution():
if self.slope is None or self.aspect is None:
self.calculate_slope_aspect()
return self.catchment.dem, self.catchment.dem_data, self.slope, self.aspect
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning) # pyproj
dem_file = self.catchment.dem.files[0]
with rxr.open_rasterio(dem_file) as _raw:
xr_dem = _raw.drop_vars("band")[0].load()
x_downscale_factor = self.catchment.get_dem_x_resolution() / resolution
y_downscale_factor = self.catchment.get_dem_y_resolution() / resolution
new_width = int(xr_dem.rio.width * x_downscale_factor)
new_height = int(xr_dem.rio.height * y_downscale_factor)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning) # pyproj
xr_dem_downsampled = xr_dem.rio.reproject(
xr_dem.rio.crs,
shape=(new_height, new_width),
resampling=Resampling.bilinear,
)
# Save the downsampled DEM to a file
if isinstance(output_path, str):
output_path = Path(output_path)
filepath = output_path / "downsampled_dem.tif"
xr_dem_downsampled.rio.to_raster(filepath)
# Reopen the downsampled DEM as a rasterio dataset
new_dem = rasterio.open(filepath)
if self.catchment.outline is not None:
geoms = [mapping(polygon) for polygon in self.catchment.outline]
new_dem_data, _ = mask(new_dem, geoms, crop=False)
else:
new_dem_data = new_dem.read(1)
new_dem_data[new_dem_data == new_dem.nodata] = np.nan
if len(new_dem_data.shape) == 3:
new_dem_data = new_dem_data[0]
new_slope = xrs.slope(xr_dem_downsampled, name="slope").to_numpy()
new_aspect = xrs.aspect(xr_dem_downsampled, name="aspect").to_numpy()
xr_dem_downsampled.close()
return new_dem, new_dem_data, new_slope, new_aspect
[docs]
def calculate_slope_aspect(self) -> None:
"""
Calculate the slope and aspect of the whole DEM.
Computes slope in degrees and aspect (0-360°) from the DEM using
spatial derivatives. Results stored in self.slope and self.aspect.
"""
if not HAS_PYARROW:
raise DependencyError(
"pyarrow is required for slope and aspect calculations.",
package_name="pyarrow",
operation="CatchmentTopography.calculate_slope_aspect",
install_command="pip install pyarrow",
)
if not HAS_XRSPATIAL:
raise DependencyError(
"xarray-spatial is required for slope and aspect calculations.",
package_name="xarray-spatial",
operation="CatchmentTopography.calculate_slope_aspect",
install_command="pip install xarray-spatial",
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning) # pyproj
dem_file = self.catchment.dem.files[0]
with rxr.open_rasterio(dem_file) as _raw:
xr_dem = _raw.drop_vars("band")[0]
self.slope = xrs.slope(xr_dem, name="slope").to_numpy()
self.aspect = xrs.aspect(xr_dem, name="aspect").to_numpy()
[docs]
def get_hillshade(
self, azimuth: float = 315, altitude: float = 45, z_factor: float = 1
) -> np.ndarray:
"""
Create a hillshade from the DEM.
Adapted from https://github.com/royalosyin/Work-with-DEM-data-using-Python-
from-Simple-to-Complicated/blob/master/ex07-Hillshade%20from%20a%20Digital
%20Elevation%20Model%20(DEM).ipynb
Parameters
----------
azimuth
The desired azimuth for the hillshade.
altitude
The desired sun angle altitude for the hillshade.
z_factor
The z factor to amplify the relief.
Returns
-------
np.ndarray
2D array of hillshade values (0-255) representing shaded
relief visualization.
"""
x, y = np.gradient(self.catchment.dem.read(1))
x_pixel_size = self.catchment.get_dem_x_resolution()
y_pixel_size = self.catchment.get_dem_y_resolution()
if azimuth > 360.0:
raise ConfigurationError(
"Azimuth value should be less than or equal to 360 degrees",
parameter_name="azimuth",
parameter_value=azimuth,
reason="Value out of valid range",
)
if altitude > 90.0:
raise ConfigurationError(
"Altitude value should be less than or equal to 90 degrees",
parameter_name="altitude",
parameter_value=altitude,
reason="Value out of valid range",
)
# Account for the pixel size
x = z_factor * x / x_pixel_size
y = z_factor * y / y_pixel_size
azimuth = 360.0 - azimuth
azimuth_rad = azimuth * np.pi / 180.0
altitude_rad = altitude * np.pi / 180.0
slope = np.pi / 2.0 - np.arctan(np.sqrt(x * x + y * y))
aspect = np.arctan2(-x, y)
shaded = np.sin(altitude_rad) * np.sin(slope) + np.cos(altitude_rad) * np.cos(
slope
) * np.cos((azimuth_rad - np.pi / 2.0) - aspect)
return 255 * (shaded + 1) / 2