Source code for hydrobricks.preprocessing.potential_solar_radiation

from __future__ import annotations

import logging
import warnings
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np

from hydrobricks import rasterio, rxr, xr
from hydrobricks._constants import (
    AIR_MOLAR_MASS,
    ES_ECCENTRICITY,
    ES_SM_AXIS,
    GRAVITY,
    R_GAS,
    SEA_ATM_PRESSURE,
    SEA_HEIGHT,
    SEA_SURFACE_TEMPERATURE,
    SOLAR_CST,
    T_LAPSE_RATE,
    TO_DEG,
    TO_RAD,
)
from hydrobricks._exceptions import (
    ConfigurationError,
    DataError,
    DependencyError,
    ModelError,
)
from hydrobricks._optional import HAS_RASTERIO, HAS_XARRAY

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from hydrobricks.catchment import Catchment

if HAS_RASTERIO:
    from rasterio.enums import Resampling


[docs] class PotentialSolarRadiation: """ A class to handle solar radiation data for a catchment area. """ def __init__(self, catchment: Catchment) -> None: """ Initialize the SolarRadiation class. Parameters ---------- catchment The catchment object. """ self.catchment: Catchment = catchment self.mean_annual_radiation: np.ndarray | None = None
[docs] def calculate_daily_potential_radiation( self, output_path: str, resolution: float | None = None, atmos_transmissivity: float = 0.75, steps_per_hour: int = 4, with_cast_shadows: bool = True, ) -> None: """ Compute the daily mean potential clear-sky direct solar radiation at the DEM surface [W/m²] using Hock (1999)'s equation. It is computed for each day of the year and saved in a netcdf file. Computes radiation accounting for terrain slope, aspect, latitude, and solar declination. Optionally includes cast shadows from surrounding terrain. Parameters ---------- output_path Path to for created daily and annual mean potential clear-sky direct solar radiation files. resolution Desired pixel resolution, default is the DEM resolution. atmos_transmissivity Mean clear-sky atmospheric transmissivity, default is 0.75 (value taken in Hock 1999) steps_per_hour Number of steps per hour to compute the potential radiation, default is 4. with_cast_shadows If True, the cast shadows are taken into account. Default is True. Notes ----- This function is based on the R package TopoSol, authored by Matthew Olson. References ---------- - Original R package: https://github.com/mattols/TopoSol - Hock, R. (1999). A distributed temperature-index ice-and snowmelt model including potential direct solar radiation. J. Glaciol., 45(149), 101-111. """ # Resample the DEM and calculate the slope and aspect dem, masked_dem_data, slope, aspect = ( self.catchment.topography.resample_dem_and_calculate_slope_aspect( resolution, output_path ) ) n_rows = slope.shape[0] n_cols = slope.shape[1] # Pre-compute index grids once to avoid repeated allocation and fragmentation dem_data_shape = dem.read(1).shape xv, yv = np.indices(dem_data_shape, dtype=np.int32) # Create an array with the day of the year (Julian Day) day_of_year = np.arange(1, 367) # Get some catchment attributes mean_elevation = self.catchment.topography.get_mean_elevation() mean_lat, _ = self.catchment.extract_unit_mean_lat_lon(self.catchment.dem_data) lat_rad = mean_lat * TO_RAD # Compute the solar declination solar_declin = self.get_solar_declination_rad(day_of_year) # Compute the hour angle starting value ha_limit = self.get_solar_hour_angle_limit(solar_declin, lat_rad) # Time intervals (360°/24h = 15° per hour, divided by the # of steps / hour) time_interval = (15 / steps_per_hour) * TO_RAD # Create array for the daily potential radiation daily_radiation = np.full((len(day_of_year), n_rows, n_cols), np.nan) # Loop over the days of the year for i in range(len(day_of_year)): # Print every 10 days if day_of_year[i] % 10 == 0: logger.debug(f"Computing radiation for day {day_of_year[i]}") # List of hour angles throughout the day. ha_list = np.arange( -ha_limit[i], ha_limit[i] + time_interval, time_interval ) # Compute the zenith and azimuth zenith = self.get_solar_zenith(ha_list, lat_rad, solar_declin[i]) azimuth = self.get_solar_azimuth_to_south(ha_list, lat_rad, solar_declin[i]) # Potential radiation over the time intervals inter_pot_radiation = np.full((len(ha_list), n_rows, n_cols), np.nan) for j in range(len(zenith)): # Shorten computation time, because if zenith >= 90 : no irradiation. if zenith[j] >= 90: inter_pot_radiation[j, :, :] = np.zeros_like(zenith[j]) continue incidence_angle = self._calculate_angle_of_incidence( zenith[j], slope, azimuth[j], aspect ) potential_radiation = self._calculate_radiation_hock_equation( mean_elevation, atmos_transmissivity, day_of_year[i], zenith[j], incidence_angle, ) # Account for cast shadows if with_cast_shadows: cast_shadows = self.calculate_cast_shadows( dem, masked_dem_data, zenith[j], azimuth[j], xv, yv ) potential_radiation = potential_radiation * (1 - cast_shadows) inter_pot_radiation[j, :, :] = potential_radiation.copy() del potential_radiation if with_cast_shadows: del cast_shadows with warnings.catch_warnings(): # This function throws a warning for the first slides of nanmean, # it is normal and due to the NaN bands at the sides of the # slope rasters, etc. warnings.filterwarnings(action="ignore", message="Mean of empty slice") daily_radiation[i, :, :] = np.nansum(inter_pot_radiation, axis=0) daily_radiation[i, :, :] /= 24 * steps_per_hour # Mean annual potential radiation mean_annual_radiation = np.full((n_rows, n_cols), np.nan) mean_annual_radiation[:, :] = np.nanmean(daily_radiation, axis=0) self.upscale_and_save_mean_annual_radiation_rasters( mean_annual_radiation, dem, output_path ) # Put the mask back on (we need the surrounding topography in the steps before) # And make sure the padding lines are also set to nans and not 0 mean_annual_radiation[np.isnan(masked_dem_data)] = np.nan mean_annual_radiation[0, :] = np.nan mean_annual_radiation[-1, :] = np.nan mean_annual_radiation[:, 0] = np.nan mean_annual_radiation[:, -1] = np.nan # Save the daily potential radiation to a netcdf file self._save_potential_radiation_netcdf( daily_radiation, dem, masked_dem_data, day_of_year, output_path ) # If DEM is the downsampled one, close it if dem.res[0] != self.catchment.get_dem_x_resolution(): dem.close()
[docs] def load_mean_annual_radiation_raster( self, dir_path: str, filename: str = "annual_potential_radiation.tif" ) -> None: """ Load the mean annual radiation raster. Loads a pre-computed mean annual radiation raster from a file and stores it in the mean_annual_radiation attribute. Parameters ---------- dir_path Path to the input file directory. filename Name of the input file. Default is 'annual_potential_radiation.tif'. """ with rxr.open_rasterio(Path(dir_path) / filename) as _raw: self.mean_annual_radiation = _raw.drop_vars("band")[0].load()
[docs] def upscale_and_save_mean_annual_radiation_rasters( self, mean_annual_radiation: np.ndarray, dem: rasterio.Dataset, output_path: str, output_filename: str = "annual_potential_radiation.tif", ) -> None: """ Save the mean annual radiation rasters (downsampled and at DEM resolution) to a file. Upscales radiation from a coarser resolution to the DEM resolution using bilinear interpolation if necessary, and saves both the intermediate downsampled raster and the final raster at DEM resolution. Parameters ---------- mean_annual_radiation Downsampled mean annual radiation. dem The DEM at the resolution of the radiation. output_path Path to the output directory. output_filename Name of the output file. Default is 'annual_potential_radiation.tif'. """ # Create the profile profile = dem.profile # Define the output paths temp_path = Path(output_path) / "downsampled_annual_potential_radiation.tif" res_path = Path(output_path) / output_filename # If both resolutions are the same, just save the mean annual radiation if dem.res[0] == self.catchment.get_dem_x_resolution(): with rasterio.open(res_path, "w", **profile) as dst: dst.write(mean_annual_radiation, 1) self.mean_annual_radiation = mean_annual_radiation return # Save a temporary file to upscale the mean annual radiation with rasterio.open(temp_path, "w", **profile) as dst: dst.write(mean_annual_radiation, 1) # Upscale the mean annual radiation to the DEM resolution with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) # pyproj with rxr.open_rasterio(temp_path).drop_vars("band")[0] as xr_dem: xr_dem_upscaled = xr_dem.rio.reproject( xr_dem.rio.crs, shape=self.catchment.dem.shape, resampling=Resampling.bilinear, ) xr_dem_upscaled.rio.to_raster(res_path) self.mean_annual_radiation = xr_dem_upscaled.to_numpy() xr_dem_upscaled.close()
[docs] @staticmethod def calculate_cast_shadows( dem_dataset: rasterio.Dataset, masked_dem: np.ndarray, zenith: float, azimuth: float, xv: np.ndarray = None, yv: np.ndarray = None, ) -> np.ndarray: """ Calculate the cast shadows on terrain from a given sun position. The approach relies on tilting the DEM to obtain a horizon matching the sun rays and filling the DEM. The algorithm is applied to the whole topography before masking the areas outside the catchment. Parameters ---------- dem_dataset Opened rasterio DEM dataset. masked_dem DEM array masked for catchment extent. zenith Solar zenith angle in degrees. azimuth Solar azimuth angle in degrees from north. xv Pre-computed x grid for the DEM, to avoid repeated allocation. If None, it will be computed inside the function. Default is None. yv Pre-computed y grid for the DEM, to avoid repeated allocation. If None, it will be computed inside the function. Default is None. Returns ------- np.ndarray Binary mask where 1 indicates shadowed areas and 0 indicates sunlit areas. Notes ----- The algorithm is applied to the whole topography before masking the areas outside the catchment. """ dem = dem_dataset.read(1) x_size = abs(dem_dataset.res[0]) y_size = abs(dem_dataset.res[1]) if x_size != y_size: raise DataError( "The DEM x and y resolutions must be equal " "for computing the cast shadows.", data_type="DEM", reason="Unequal x and y resolutions", ) if zenith >= 90: cast_shadows = np.ones(dem.shape) cast_shadows[np.isnan(masked_dem)] = np.nan return cast_shadows # Get mean pixel size pixel_size = (x_size + y_size) / 2 # Get the x and y grids for the DEM if xv is None or yv is None: xv, yv = np.indices(dem.shape, dtype=np.int32) # Get the base arrays for the solar ray IDs and the offset distances if azimuth < -135: # NNE ref_grid = xv base_ray_ids = yv[:, ::-1] angle = np.tan(-(azimuth + 90) * TO_RAD) elif azimuth < -90: # ENE ref_grid = yv[:, ::-1] base_ray_ids = xv angle = np.tan((azimuth + 180) * TO_RAD) elif azimuth < -45: # ESE ref_grid = yv[:, ::-1] base_ray_ids = xv[::-1, :] angle = np.tan(-azimuth * TO_RAD) elif azimuth < 0: # SSE ref_grid = xv[::-1, :] base_ray_ids = yv[:, ::-1] angle = np.tan((90 + azimuth) * TO_RAD) elif azimuth < 45: # SSW ref_grid = xv[::-1, :] base_ray_ids = yv angle = np.tan((90 - azimuth) * TO_RAD) elif azimuth < 90: # WSW ref_grid = yv base_ray_ids = xv[::-1, :] angle = np.tan(azimuth * TO_RAD) elif azimuth < 135: # WNW ref_grid = yv base_ray_ids = xv angle = np.tan((180 - azimuth) * TO_RAD) else: # NNW ref_grid = xv base_ray_ids = yv angle = np.tan((azimuth - 90) * TO_RAD) # Computation of the solar ray paths for each pixel (rays with different IDs). ray_ids = base_ray_ids - ref_grid / angle # Adding the offset (from azimuth) ray_ids = ray_ids - np.nanmin(ray_ids) # Set all IDs to positive values ray_ids = ray_ids.astype(int) + 1 # Convert to int # Compute the tilted DEM orthogonal_distance = ref_grid + base_ray_ids / angle # Projected distance tilt_heights = orthogonal_distance * pixel_size / np.tan(zenith * TO_RAD) tilted_dem = dem + tilt_heights # DEM after tilt of zenith °, azimuthal dir. # Remapping of the solar rays on another matrix to process them. mapped = np.ones((np.nanmax(ray_ids) + 1, np.max(ref_grid) + 1)) mapped = mapped * np.nan # Mapping of the tilted DEM from solar rays ... mapped[ray_ids, ref_grid] = tilted_dem # ... to row/columns. accumulated = np.fmax.accumulate(mapped, axis=1) cast_sh = (accumulated > mapped).astype(float) cast_shadows = cast_sh[ray_ids, ref_grid] # Put the mask back on (we previously needed the surrounding topography). cast_shadows[np.isnan(masked_dem)] = np.nan return cast_shadows
[docs] @staticmethod def get_solar_declination_rad(day_of_year: int | np.ndarray) -> float: """ Compute the solar declination. The solar declination is the angle between the rays of the Sun and the plane of the Earth's equator. It represents how much the sun is tilted towards or away from the observer's latitude. The calculation involves trigonometric functions to account for the observer's latitude and the position of the sun in the sky. Parameters ---------- day_of_year Day of the year (1-366). Returns ------- The solar declination in radians """ # Normalized day of the year # '(jd-172)' calculates the number of days that have passed since the summer # solstice (around June 21st), which is day 172 in a non-leap year. # '(360*(jd-172))/365' normalizes this count to a value representing the # position in the orbit of the Earth around the Sun (in degrees). ndy = (360 * (day_of_year - 172)) / 365 # The cos(...) function is applied to the normalized day of the year. It # produces values between -1 and 1, representing the variation in solar # declination throughout the year. The constant factor 23.45 represents the # tilt of the Earth's axis relative to its orbital plane. This tilt causes the # variation in the angle of the sun's rays reaching different latitudes # on Earth. solar_declin = 23.45 * np.cos(ndy * TO_RAD) * TO_RAD return solar_declin
[docs] @staticmethod def get_solar_hour_angle_limit( solar_declination: float | np.ndarray, lat_rad: float, ) -> float | np.ndarray: """ Compute the hour angle limit value (min/max). The hour angle is the angular distance between the sun and the observer's meridian. It is typically measured in degrees. The tangent of solar_declin/lat_rad represents the ratio of the opposite side (vertical component of the sun's rays) to the adjacent side (horizontal component). It helps capture how much the sun/the observer's location is tilted north or south relative to the equator. Parameters ---------- solar_declination Solar declination in radians. lat_rad Latitude in radians. Returns ------- The limit value of the hour angle in radians. """ # The negative sign is applied because the Hour Angle is negative in the # morning and positive in the afternoon. hour_angle = np.arccos(-np.tan(solar_declination) * np.tan(lat_rad)) return hour_angle
[docs] @staticmethod def get_solar_zenith( hour_angles: float | np.ndarray, lat_rad: float, solar_declination: float ) -> float | np.ndarray: """ Compute the solar zenith. The Solar zenith (IQBAL 2012) is the angle between the sun and the vertical (zenith) position directly above the observer. The result is expressed in degrees. The calculation involves trigonometric functions to account for the observer's latitude, the solar declination, and the position of the sun in the sky (represented by the Hour Angles). Parameters ---------- hour_angles Hour angle(s). lat_rad Latitude in radians. solar_declination Solar declination in radians. Returns ------- The solar zenith in degrees. """ zenith = ( np.arccos( (np.sin(lat_rad) * np.sin(solar_declination)) + (np.cos(lat_rad) * np.cos(solar_declination) * np.cos(hour_angles)) ) * TO_DEG ) return zenith
[docs] @staticmethod def get_solar_azimuth_to_south( hour_angles: float | np.ndarray, lat_rad: float, solar_declination: float ) -> np.ndarray: """ Compute the solar azimuth relative to the south. The solar azimuth is the angle between the sun and the observer's meridian. It is typically measured in degrees. Azimuth with negative values before solar noon and positive values with positive values after solar noon. Solar noon is defined by the change in sign of the hour angle (negative in the morning, positive in the afternoon). From https://www.astrolabe-science.fr/diagramme-solaire-azimut-hauteur Parameters ---------- hour_angles Array with the hour angles. lat_rad Latitude in radians. solar_declination Solar declination. Returns ------- The solar azimuth in degrees. """ convert_to_float = False if isinstance(hour_angles, (int, float)): hour_angles = np.array([hour_angles]) convert_to_float = True azimuth = np.degrees( np.arctan( np.sin(hour_angles) / ( np.sin(lat_rad) * np.cos(hour_angles) - np.cos(lat_rad) * np.tan(solar_declination) ) ) ) azimuth[np.where((azimuth < 0) & (hour_angles > 0))] += 180 azimuth[np.where((azimuth > 0) & (hour_angles < 0))] -= 180 if convert_to_float: azimuth = azimuth[0] return azimuth
[docs] @staticmethod def get_solar_azimuth_to_north( hour_angles: float | np.ndarray, lat_rad: float, solar_declination: float ) -> float | np.ndarray: """ Compute the solar azimuth relative to the north. See get_solar_azimuth_to_south() for more details. """ azimuth = PotentialSolarRadiation.get_solar_azimuth_to_south( hour_angles, lat_rad, solar_declination ) azimuth += 180 return azimuth
@staticmethod def _calculate_radiation_hock_equation( elevation: float, atmos_transmissivity: float, day_of_year: int | float, zenith: float, incidence_angle: np.array, ) -> np.array: """ Hock (2005) equation to compute the potential clear-sky direct solar radiation at the ice or snow surface [W/m²]. Parameters ---------- elevation Height above sea level [m] atmos_transmissivity Mean clear-sky atmospheric transmissivity day_of_year Day of the year (Julian Day) zenith Solar zenith for one moment during the day (IQBAL 2012) incidence_angle Angle of incidence between the normal to the grid slope and the solar beam Returns ------- The potential clear-sky direct solar radiation at the ice or snow surface [W/m²] Notes ----- This function is based on the R package TopoSol, authored by Matthew Olson. References ---------- - Original R package: https://github.com/mattols/TopoSol """ # True anomaly (the angle subtended at the Sun between the semi major # axis line and the current position) # Different definition here: # https://physics.stackexchange.com/questions/177949/earth-sun-distance-on-a-given-day-of-the-year theta = (365.5 / 360) * day_of_year # Current Sun-Earth distance (computed using the modern version of # Kepler's first law) current_se_dist = (ES_SM_AXIS * (1 - ES_ECCENTRICITY * ES_ECCENTRICITY)) / ( 1 + ES_ECCENTRICITY * np.cos(theta * TO_RAD) ) # Atmospheric pressure local_pressure = SEA_ATM_PRESSURE * ( 1 + (T_LAPSE_RATE / SEA_SURFACE_TEMPERATURE) * (elevation - SEA_HEIGHT) ) ** ((-GRAVITY * AIR_MOLAR_MASS) / (R_GAS * T_LAPSE_RATE)) # Hock equation (Hock, 1999) to compute the potential # clear-sky direct solar radiation if zenith > 90 - 1e-10: empty_matrix = np.zeros(incidence_angle.shape) # Set border values to nan empty_matrix[0, :] = np.nan empty_matrix[-1, :] = np.nan empty_matrix[:, 0] = np.nan empty_matrix[:, -1] = np.nan return empty_matrix solar_radiation = ( SOLAR_CST * ((ES_SM_AXIS / current_se_dist) ** 2) * atmos_transmissivity ** (local_pressure / (SEA_ATM_PRESSURE * np.cos(zenith * TO_RAD))) * np.cos(incidence_angle) ) return solar_radiation @staticmethod def _calculate_angle_of_incidence( zenith: float, slope: float, azimuth: float, aspect: float, tolerance: float = 10 ** (-6), ) -> np.ndarray: """ Calculate the angle of incidence. Computes the angle between the solar ray direction and the normal to the terrain surface, accounting for both terrain slope and aspect relative to the sun's position. Parameters ---------- zenith Solar zenith (IQBAL 2012), in degrees slope Slope of the DEM, in degrees azimuth Azimuth for ZSLOPE CALC, in degrees aspect Aspect of the DEM, in degrees tolerance Error tolerance in the trigonometric computations. Returns ------- np.ndarray Angle of incidence in radians (maximum 90°). Notes ----- This function is based on the R package TopoSol, authored by Matthew Olson. References ---------- - Original R package: https://github.com/mattols/TopoSol """ # Solar zenith and azimuth on a slope zenith_rad = zenith * TO_RAD slope_rad = slope * TO_RAD azimuth_rad = azimuth * TO_RAD aspect_rad = (aspect - 180) * TO_RAD cosine_term = (np.cos(zenith_rad) * np.cos(slope_rad)) + ( np.sin(zenith_rad) * np.sin(slope_rad) * np.cos(azimuth_rad - aspect_rad) ) if np.nanmax(np.abs(cosine_term) - 1) < tolerance: incidence_angle = np.arccos(np.clip(cosine_term, -1, 1)) else: raise ConfigurationError( "Argument of arccos is above or below 1.", parameter_name="cosine_term", parameter_value=cosine_term, reason="Value outside [-1, 1] range", ) # Angle of incidence matrix incidence_angle[incidence_angle > 90 * TO_RAD] = 90 * TO_RAD return incidence_angle def _save_potential_radiation_netcdf( self, radiation: np.ndarray, dem: rasterio.Dataset, masked_dem_data: np.ndarray, day_of_year: np.ndarray, output_path: str | None, output_filename: str = "daily_potential_radiation.nc", ) -> None: """ Save the potential radiation to a netcdf file. Saves daily radiation arrays and computes/saves mean annual radiation. Creates two output files: one for daily radiation and one for annual mean. Parameters ---------- radiation 3D array of potential radiation (days × rows × cols) [W/m²]. dem Rasterio dataset of the DEM at radiation resolution. masked_dem_data DEM data masked for catchment extent. day_of_year Array of day numbers (1-366). output_path Path to output directory. output_filename Filename for daily radiation output. Default is 'daily_potential_radiation.nc'. """ full_path = Path(output_path) / output_filename logger.info(f"Saving to {str(full_path)} with CRS {self.catchment.dem.crs}") if not HAS_XARRAY: raise DependencyError( "xarray is required to save potential solar radiation data.", package_name="xarray", operation="PotentialSolarRadiation.save_potential_radiation_netcdf", install_command="pip install xarray", ) rows, cols = np.where(masked_dem_data) xs, ys = rasterio.transform.xy(dem.transform, list(rows), list(cols)) xs = np.array(xs).reshape(masked_dem_data.shape)[0, :] ys = np.array(ys).reshape(masked_dem_data.shape)[:, 0] ds = xr.DataArray( radiation, name="radiation", dims=["day_of_year", "y", "x"], coords={"x": xs, "y": ys, "day_of_year": day_of_year}, ) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) # pyproj ds.rio.write_crs(self.catchment.dem.crs, inplace=True) ds.x.attrs["axis"] = "X" ds.x.attrs["long_name"] = "x coordinate of projection" ds.x.attrs["standard_name"] = "projection_x_coordinate" ds.x.attrs["units"] = "meter" ds.y.attrs["axis"] = "Y" ds.y.attrs["long_name"] = "y coordinate of projection" ds.y.attrs["standard_name"] = "projection_y_coordinate" ds.y.attrs["units"] = "meter" try: ds.to_netcdf(full_path) logger.info("File successfully written.") except OSError as e: logger.error( f"Error writing netCDF file to {full_path}: {e}", exc_info=True ) raise ModelError(f"Error writing to file: {e}") from e except ValueError as e: logger.error( f"Invalid data for netCDF file {full_path}: {e}", exc_info=True ) raise ModelError(f"Error writing to file: {e}") from e finally: try: ds.close() except (OSError, ValueError, AttributeError) as e: logger.warning(f"Error closing xarray dataset: {e}", exc_info=True)