Source code for virtual_ecosystem.models.abiotic.abiotic_tools

"""The ``models.abiotic.abiotic_tools`` module contains a set of general functions that
are shared across submodules in the
:mod:`~virtual_ecosystem.models.abiotic.abiotic_model` model.

TODO cross-check with pyrealm for duplication/ different implementation
TODO change temperatures to Kelvin
"""  # noqa: D205

from collections.abc import Iterable
from types import SimpleNamespace

import bottleneck as bn
import numpy as np
from numpy.typing import NDArray
from pyrealm.constants import CoreConst as PyrealmCoreConst
from pyrealm.core.hygro import calc_vp_sat
from xarray import DataArray

from virtual_ecosystem.core.core_components import LayerStructure
from virtual_ecosystem.core.data import Data
from virtual_ecosystem.models.abiotic.wind import next_valid_below


[docs] def build_indices(data: Data, layer_structure: LayerStructure) -> SimpleNamespace: """Build indices for different layers and variables for easier access. Args: data: Data object layer_structure: Layer structure object Returns: SimpleNamespace with indices for different layers and variables """ return SimpleNamespace( above=layer_structure.index_above, canopy=layer_structure.index_filled_canopy, surface=layer_structure.index_surface_scalar, atm=layer_structure.index_filled_atmosphere, flux=layer_structure.index_flux_layers, soil=layer_structure.index_all_soil, topsoil=layer_structure.index_topsoil_scalar, layers=layer_structure.n_layers, cell_id=data.grid.n_cells, )
[docs] def calculate_molar_density_air( temperature: NDArray[np.floating], atmospheric_pressure: NDArray[np.floating], standard_mole: float, standard_pressure: float, celsius_to_kelvin: float, ) -> NDArray[np.floating]: """Calculate temperature-dependent molar density of air. Implementation after :cite:t:`maclean_microclimc_2021`. Args: temperature: Air temperature, [C] atmospheric_pressure: Atmospheric pressure, [kPa] standard_mole: Moles of ideal gas in 1 m^3 air at standard atmosphere standard_pressure: Standard atmospheric pressure, [kPa] celsius_to_kelvin: Factor to convert temperature in Celsius to absolute temperature in Kelvin Returns: molar density of air, [mol m-3] """ temperature_kelvin = temperature + celsius_to_kelvin return ( standard_mole * (atmospheric_pressure / standard_pressure) * (celsius_to_kelvin / temperature_kelvin) )
[docs] def calculate_air_density( air_temperature: NDArray[np.floating], atmospheric_pressure: NDArray[np.floating], specific_gas_constant_dry_air: float, celsius_to_kelvin: float, ): """Calculate the density of air using the ideal gas law. Args: air_temperature: Air temperature, [C] atmospheric_pressure: Atmospheric pressure, [kPa] specific_gas_constant_dry_air: Specific gas constant for dry air, [J kg-1 K-1] celsius_to_kelvin: Factor to convert temperature in Celsius to absolute temperature in Kelvin Returns: density of air, [kg m-3]. """ # Convert temperature from Celsius to Kelvin temperature_k = air_temperature + celsius_to_kelvin # Calculate density using the ideal gas law return ( atmospheric_pressure * 1000.0 / (temperature_k * specific_gas_constant_dry_air) )
[docs] def calculate_latent_heat_vapourisation( temperature: NDArray[np.floating], celsius_to_kelvin: float, latent_heat_vap_equ_factors: tuple[float, float], ) -> NDArray[np.floating]: """Calculate latent heat of vapourisation. Implementation after Eq. 8, :cite:t:`henderson-sellers_new_1984`. Args: temperature: Air temperature, [C] celsius_to_kelvin: Factor to convert temperature in Celsius to absolute temperature in Kelvin latent_heat_vap_equ_factors: Factors in calculation of latent heat of vapourisation Returns: latent heat of vapourisation, [kJ kg-1] """ temperature_kelvin = temperature + celsius_to_kelvin a, b = latent_heat_vap_equ_factors return (a * (temperature_kelvin / (temperature_kelvin - b)) ** 2) / 1000.0
[docs] def find_last_valid_row(array: NDArray[np.floating]) -> NDArray[np.floating]: """Find last valid value in array for each column. This function looks for the last valid value in each column of a 2-dimensional array. If the previous value is nan, it moved up the array. If all values are NaN, the value is set to NaN, too. Args: array: Two-dimesional array for which last valid values should be found Returns: Array that contains last valid values """ # Initialize an empty list to store the last valid value from each column new_row = [] # Loop through each column for column in range(array.shape[1]): # Scan from the last row to the first in the current column for i in range(array.shape[0] - 1, -1, -1): if not np.isnan(array[i, column]): # Append the last valid value found in the column to the new_row list new_row.append(array[i, column]) break else: # If no valid value is found in the column, append NaN new_row.append(np.nan) return np.array(new_row)
[docs] def calculate_slope_of_saturated_pressure_curve( temperature: NDArray[np.floating], saturated_pressure_slope_parameters: tuple[float, float, float, float], ) -> NDArray[np.floating]: r"""Calculate slope of the saturated pressure curve. Args: temperature: Temperature, [C] saturated_pressure_slope_parameters: List of parameters to calculate the slope of the saturated vapour pressure curve Returns: Slope of the saturated pressure curve, :math:`\Delta_{v}` """ a, b, c, d = saturated_pressure_slope_parameters return ( a * (b * np.exp(c * temperature / (temperature + d))) / (temperature + d) ** 2 )
[docs] def calculate_actual_vapour_pressure( air_temperature: DataArray, relative_humidity: DataArray, pyrealm_core_constants: PyrealmCoreConst, ) -> DataArray: """Calculate actual vapour pressure, [kPa]. Args: air_temperature: Air temperature, [C] relative_humidity: Relative humidity, [-] pyrealm_core_constants: Set of constants from pyrealm Returns: actual vapour pressure, [kPa] """ saturation_vapour_pressure_air = calc_vp_sat( ta=air_temperature.to_numpy(), core_const=pyrealm_core_constants, ) return saturation_vapour_pressure_air * relative_humidity / 100.0
[docs] def set_unintended_nan_to_zero( input_array: NDArray[np.floating], input_nan_mask: NDArray[np.bool], ) -> NDArray[np.floating]: """Clean up outputs: set unintended NaNs to 0, preserve intended NaNs. Args: input_array: Input array that may contain NaN input_nan_mask: A mask of intended NaN Returns: Array with unintended NaN set to zero """ arr_clean = np.where(np.isnan(input_array), 0.0, input_array) arr_clean[input_nan_mask] = np.nan return arr_clean
[docs] def compute_layer_thickness_for_varying_canopy( heights: NDArray[np.floating], ) -> NDArray[np.floating]: """Calculate layer thickness for varying canopy layers, true layers only. Calculate layer thickness by subtracting from the next valid layer below (skipping NaNs), and for the last valid layer in each column subtract from zero (ground level). Args: heights: 2D array of layer heights, [m] Returns: 2D array of layer thickness, [m], same shape as input """ # Fill np.nan values from the bottom up along the 0 axis nan_filled_heights = np.flipud(bn.push(np.flipud(heights), axis=0)) # Find the differences down through the canopy to the surface and swap sign canopy_thickness = -np.diff(nan_filled_heights, axis=0) # Add the surface layer heights down to zero thickness = np.concat([canopy_thickness, heights[[-1], :]]) # Mask out the filled layers - should all be zero and match the original np.nans thickness = np.where(np.isnan(heights), np.nan, thickness) return thickness
[docs] def compute_aboveground_layer_thickness( heights: NDArray[np.floating], ) -> NDArray[np.floating]: """Calculate layer thickness for above ground layers only. Calculate layer thickness by subtracting from the next valid layer below (skipping NaNs), and for the last valid layer in each column subtract from zero (ground level). Soil layers are set to NaN. Args: heights: 2D array of layer heights, [m] Returns: 2D array of layer thickness, [m], same shape as input """ n_layers, n_cols = heights.shape thickness = np.full_like(heights, np.nan) # only above ground above_ground = heights > 0 valid = above_ground & ~np.isnan(heights) # next valid below above ground above_ground_heights = np.where(above_ground, heights, np.nan) below_idx = next_valid_below(above_ground_heights) cols = np.broadcast_to(np.arange(n_cols), (n_layers, n_cols)) mask = valid & (below_idx >= 0) h_top = heights[mask] h_bot = heights[below_idx[mask], cols[mask]] thickness[mask] = np.abs(h_top - h_bot) # last above ground layer → ground mask_last = valid & (below_idx < 0) thickness[mask_last] = np.abs(heights[mask_last]) return thickness
[docs] def calculate_specific_humidity( air_temperature: NDArray[np.floating], relative_humidity: NDArray[np.floating], atmospheric_pressure: NDArray[np.floating], molecular_weight_ratio_water_to_dry_air: float, pyrealm_core_constants: PyrealmCoreConst, ) -> NDArray[np.floating]: """Calculate specific humidity. Args: air_temperature: Air temperature, [C] relative_humidity: Relative humidity, [%] atmospheric_pressure: Atmospheric pressure, [kPa] molecular_weight_ratio_water_to_dry_air: The ratio of the molar mass of water vapour to the molar mass of dry air pyrealm_core_constants: Pyrealm core constants Returns: Specific humidity, [kg kg-1] """ # Saturation vapor pressure saturation_vapour_pressure = calc_vp_sat( ta=air_temperature, core_const=pyrealm_core_constants, ) # Actual vapor pressure (hPa) actual_vapour_pressure = (relative_humidity / 100.0) * saturation_vapour_pressure # Specific humidity formula specific_humidity = ( molecular_weight_ratio_water_to_dry_air * actual_vapour_pressure ) / ( atmospheric_pressure - ((1 - molecular_weight_ratio_water_to_dry_air) * actual_vapour_pressure) ) return specific_humidity
[docs] def update_profile_from_reference( layer_structure: LayerStructure, mask_variable: DataArray, variable_name: DataArray, time_index: int, ) -> DataArray: """Update a layer-based profile for a given time index using a reference variable. This function - extracts a mask from air temperature to determine valid atmosphere layers - reads the reference variable at the given time index - applies the mask to keep only valid layers - fills the profile template for those layers Args: layer_structure: LayerStructure object defining the layer setup mask_variable: DataArray used to create the atmospheric mask variable_name: Reference variable (e.g. data["atmospheric_pressure_ref"]) time_index: Index of the current time step Returns: Updated layer profile as a DataArray """ # Create atmospheric mask for filling constant values atm_mask = ~np.isnan( mask_variable.isel(layers=layer_structure.index_filled_atmosphere) ) # Mean atmospheric pressure profile, [kPa] profile_out = layer_structure.from_template() reference_values = variable_name.isel(time_index=time_index) valid_values = reference_values.where(atm_mask) profile_out[layer_structure.index_filled_atmosphere] = valid_values return profile_out
[docs] def calculate_atmospheric_layer_geometry( data: Data, idx: SimpleNamespace, minimum_mixing_depth: float ) -> dict[str, NDArray[np.floating]]: """Calculate heights, layer thickness, and midpoints for atmospheric layers. The midpoint values are distances in metres above ground for each cell. For layer heights below the surface layer, a minimum mixing depth is introduced. This is to prevent unrealistically low mixing depths and therefore high temperatures in the lowest canopy layer when the layer height is very low. Note that this is an artificial inflation of the mixing depth. Args: data: Data object idx: SimpleNamespace containing layer indices minimum_mixing_depth: Minimum depth for lowest canopy layer, [m] Returns: dict containing heights, thickness, layer_top, layer_midpoints """ # Extract above-ground layer heights and correct of lowest canopy layer is below # surface layer height heights = data["layer_heights"].to_numpy().copy() # Set minimum mixing depth heights[idx.canopy] = np.where( heights[idx.canopy] <= minimum_mixing_depth, minimum_mixing_depth, heights[idx.canopy], ) heights_corrected = np.where( np.isnan(data["layer_heights"].to_numpy()), np.nan, heights ) # Compute thickness thickness = compute_aboveground_layer_thickness(heights=heights_corrected) # Compute the midpoint of each layer as the height above ground minus half the layer # thickness midpoints = heights_corrected - thickness / 2 return { "heights": heights, "thickness": thickness, "layer_midpoints": midpoints, }
[docs] def generate_diurnal_cycle_from_monthly_data( monthly_air_temperature: NDArray[np.floating], monthly_shortwave_absorption: NDArray[np.floating], monthly_relative_humidity: NDArray[np.floating], monthly_evapotranspiration: NDArray[np.floating], monthly_soil_evaporation: NDArray[np.floating], latitude_deg: float, month: int, days: int, daily_temp_amplitude: float = 5.0, ) -> dict[str, NDArray[np.floating]]: """Generate synthetic hourly forcing for one day from monthly averages. Args: monthly_air_temperature: Monthly mean air temperature [C] monthly_shortwave_absorption: Monthly mean daily shortwave absorption [W m-2] monthly_relative_humidity: Monthly mean relative humidity [%] monthly_evapotranspiration: Monthly total evapotranspiration [mm/month] monthly_soil_evaporation: Monthly total soil evaporation [mm/month] latitude_deg: Latitude for daylength calculation [deg] month: Month number [1-12] days: Number of days in month daily_temp_amplitude: typical diurnal temperature swing [C] Returns: dict of arrays air_temperature_hourly, shortwave_absorption_hourly, relative_humidity_hourly, evapotranspiration_hourly, soil_evaporation_hourly """ hours_per_day = 24 hours = np.arange(hours_per_day) # Air temperature (sine wave, max at 14:00), (hours_per_day, cell) air_temperature_hourly = monthly_air_temperature[ None, : ] + daily_temp_amplitude * np.sin(2 * np.pi * (hours[:, None] - 8) / hours_per_day) # Daylength (simple climatology) daylength = 12 + 4 * np.cos((month - 1) * np.pi / 6) * np.cos( np.radians(latitude_deg) ) daylength = np.clip(daylength, 6.0, 18.0) sunrise = 12 - daylength / 2 sunset = 12 + daylength / 2 # Shortwave radiation (half-sine over daylight), shape (hours_per_day,) hour_fraction = np.zeros(hours_per_day) for h in range(hours_per_day): if sunrise <= h <= sunset: hour_fraction[h] = np.sin(np.pi * (h - sunrise) / daylength) # Normalise so daytime hours sum to 1 — night hours remain 0 if hour_fraction.sum() > 0: hour_fraction /= hour_fraction.sum() # Shortwave absorption (hours_per_day, layer, cell) shortwave_absorption_hourly = ( monthly_shortwave_absorption[None, :, :] * hour_fraction[:, None, None] ) # Relative humidity (constant vapour pressure approach) e_s_mean = calc_vp_sat(monthly_air_temperature) e_a = monthly_relative_humidity / 100.0 * e_s_mean e_s_hourly = calc_vp_sat(air_temperature_hourly) relative_humidity_hourly = np.clip(100.0 * e_a[None, :] / e_s_hourly, 0.0, 100.0) # Evapotranspiration — conserve monthly total, distribute by radiation. # hour_fraction is already normalised to sum=1 over daytime hours and # is 0 at night, so multiplying by it gives zero ET at night AND # preserves the daily total exactly. # # Shape: (hours_per_day, layer, cell) monthly_et = monthly_evapotranspiration[None, :, :] # (1, layer, cell) daily_et = monthly_et / days # (1, layer, cell) # Distribute daily ET by hour_fraction (zero at night, sums to 1 over day) evapotranspiration_hourly = daily_et * hour_fraction[:, None, None] # Preserve NaN structure from input (unoccupied canopy layers) canopy_nan_mask = np.isnan(monthly_evapotranspiration) # (layer, cell) evapotranspiration_hourly = np.where( canopy_nan_mask[None, :, :], np.nan, evapotranspiration_hourly, ) # Soil evaporation — same approach, shape (hours_per_day, cell) monthly_soil = monthly_soil_evaporation[None, :] # (1, cell) daily_soil = monthly_soil / days # (1, cell) soil_evaporation_hourly = daily_soil * hour_fraction[:, None] # Preserve NaN structure from input soil_nan_mask = np.isnan(monthly_soil_evaporation) # (cell,) soil_evaporation_hourly = np.where( soil_nan_mask[None, :], np.nan, soil_evaporation_hourly, ) return { "air_temperature_hourly": air_temperature_hourly, "relative_humidity_hourly": relative_humidity_hourly, "shortwave_absorption_hourly": shortwave_absorption_hourly, "evapotranspiration_hourly": evapotranspiration_hourly, "soil_evaporation_hourly": soil_evaporation_hourly, }
[docs] def fill_layer_template( layer_structure: LayerStructure, assignments: list[tuple], ) -> NDArray[np.floating]: """Fill layer template with index values. Args: layer_structure: LayerStructure assignments: list of variable names, indices and values Returns: array with updated indices """ out = layer_structure.from_template().to_numpy() for indices, values in assignments: out[indices, :] = values return out
[docs] def record_hourly_output( hour: int, data_record: dict, hourly_values: dict, ): """Record hourly data. Args: hour: Hour of the day data_record: dict that contains all hourly data hourly_values: Hourly values Returns: updated dict with hour values """ # Assign values to data_record for var, value in hourly_values.items(): if var not in data_record: continue else: data_record[var][hour] = value return data_record
[docs] def mean_to_layers( var: str, index: list[int], data_record: dict, layer_structure: LayerStructure ) -> DataArray: """Return mean value over time for given variable and fill into layer structure. Args: var: Variable name index: List of layer indices to fill data_record: Data record dict layer_structure: LayerStructure object Returns: DataArray with mean values filled into layer structure """ out = layer_structure.from_template() mean_vals = np.nanmean(data_record[var], axis=0) out[index] = mean_vals[index] return out
[docs] def initialize_data_record( variables: dict[str, DataArray], time_dim: int, layers: int, cell_ids: int, ) -> dict[str, NDArray[np.floating]]: """Create a data_record dict with a new leading time dimension. Assumptions are that 1D variables have shape (cell_ids,) and 2D variables have shape (layers, cell_ids). Args: variables : Dictionary of variable names to template arrays time_dim : Size of the new time dimension (e.g. 24) layers : Number of layers cell_ids : Number of cell ids Returns: Dictionary with initialized arrays filled with NaNs. Raises: ValueError: is number of dimensions cannot be matched """ data_record = {} for var, arr in variables.items(): shape: tuple[int, ...] if arr.ndim == 1: shape = (time_dim, cell_ids) elif arr.ndim == 2: shape = (time_dim, layers, cell_ids) else: raise ValueError( f"Unsupported number of dimensions for '{var}': {arr.ndim}" ) data_record[var] = np.full(shape, np.nan) return data_record
[docs] def validate_variables( names: tuple[str, ...], values: dict[str, object], exclude: Iterable[str] = (), ) -> None: """Validate all variables are in update dictionary. Args: names: variable names in output values: variables in hourly update exclude: variable names to ignore in the comparison Returns: None Raises: ValueError: if variable mismatch is detected. """ exclude_set = set(exclude) names_set = set(names) - exclude_set values_set = set(values) - exclude_set missing = names_set - values_set extra = values_set - names_set if missing or extra: raise ValueError( "Variable mismatch detected\n" f"Missing: {sorted(missing)}\n" f"Extra: {sorted(extra)}" )
[docs] def finite_and_within(arr: DataArray, low: float, high: float, name: str) -> None: """Test that values are finite and within bounds. Args: arr: Output DataArray to be tested low: Minimum value high: maximum value name: name of variable Returns: None. Raises: AssertionError: if values are not finite or outside bounds. """ values = np.asarray(arr) finite = np.isfinite(values) assert finite.any(), f"{name} has no finite values" # ignore NaNs min_val = np.nanmin(values) max_val = np.nanmax(values) assert min_val >= low, f"{name} below {low}" assert max_val <= high, f"{name} above {high}"
[docs] def compute_weights_from_absorbed_radiation( radiation: NDArray[np.floating], ) -> NDArray[np.floating]: """Convert a 2D radiation array into normalized weights that sum to 1. Weights sum to 1 along the layer axis (axis=0) for each cell independently, ignoring NaN values. NaN entries in the input remain NaN in the output. Cells where all valid radiation is zero return NaN weights — these are cells with no canopy and no radiation to distribute. Args: radiation: 2D array of absorbed radiation values for each layer and cell Returns: 2D array of normalized weights corresponding to the absorbed radiation """ # Sum over layers per cell, ignoring NaNs, keep dims for broadcasting total = np.nansum(radiation, axis=0, keepdims=True) # shape (1, cells) # Where total is zero, set to NaN so division produces NaN instead of inf # This handles no-canopy cells cleanly without raising safe_total = np.where(total == 0.0, np.nan, total) # Divide — NaN entries stay NaN, zero-total cells become NaN return radiation / safe_total