r"""The ``models.abiotic.energy_balance`` module calculates the energy balance for the
Virtual Ecosystem. Given that the time increments of the model are an hour or longer,
we can assume that below-canopy heat and vapour exchange attain steady state and heat
storage in the canopy does not need to be simulated explicitly.
(For application where very fine-temporal resolution data might be needed, heat and
vapour exchange must be modelled as transient processes, and heat storage by the canopy,
and the exchange of heat between different layers of the canopy, must be considered
explicitly, see :cite:t:`maclean_microclimc_2021`. This is currently not implemented.)
Under steady-state, the balance equation :math:`\frac{dQ}{dt}` for the leaves in each
canopy layer is as
follows (after :cite:t:`maclean_microclimc_2021`):
.. math::
\frac{dQ}{dt}
= R_{abs} - R_{em} - H - \lambda E - PP
= R_{abs} - \epsilon_{l} \sigma T_{l}^{4} - \frac{\rho_{a} c_p}{r_a}(T_{l} - T_{a})
- \lambda g_{v} \frac {e_{l} - e_{a}}{p_{a}} - PP = 0
where :math:`R_{abs}` is absorbed shortwave and longwave radiation, :math:`R_{em}`
emitted radiation, :math:`H`
the sensible heat flux, :math:`\lambda E` the latent heat flux, :math:`\epsilon_{l}` the
emissivity of the leaf, :math:`\sigma` the Stefan-Boltzmann constant, :math:`T_{l}` the
absolute temperature of the leaf, :math:`T_{a}` the absolute temperature of the air
surrounding the leaf, :math:`\lambda` the latent heat of vapourisation of water,
:math:`e_{l}` the effective vapour pressure of the leaf, :math:`e_{a}` the vapour
pressure of air and :math:`p_{a}` atmospheric pressure. :math:`\rho_a` is the density of
air, :math:`c_{p}` is the specific heat capacity of air
at constant pressure, :math:`r_{a}` is the aerodynamic resistance of the surface (leaf
or soil), :math:`g_{v}` represents the conductivity for vapour loss from the leaves as a
function of the stomatal conductivity, :math:`PP` stands for primary productivity.
A challenge in solving this equation is the dependency of latent heat and emitted
radiation on leaf temperature. We use a secant method to iteratively solve the energy
balance for the canopy temperature. The air temperature is updated based on the
sensible heat flux from the canopy and soil in equilibrium, and vertical mixing of air
between layers.
Atmospheric humidity is also mixed vertically between atmospheric layers.
Advection at the top of the canopy is currently not considered as we don't have
have horizontal exchange between grid cells and air above canopy values would be
unrealistic.
""" # noqa: D205, D415
from collections.abc import Callable
from types import SimpleNamespace
from typing import Any
import numpy as np
from numpy.typing import NDArray
from xarray import DataArray
from virtual_ecosystem.core.core_components import LayerStructure
from virtual_ecosystem.core.model_config import CoreConstants
from virtual_ecosystem.models.abiotic.abiotic_tools import (
compute_weights_from_absorbed_radiation,
find_last_valid_row,
set_unintended_nan_to_zero,
)
from virtual_ecosystem.models.abiotic.model_config import AbioticConstants
[docs]
def initialise_canopy_and_soil_fluxes(
air_temperature: DataArray,
layer_structure: LayerStructure,
initial_flux_value: float,
) -> dict[str, DataArray]:
"""Initialise canopy temperature and energy fluxes.
This function initializes the following variables to run the first step of the
energy balance routine: canopy temperature, [C], sensible
and latent heat flux (canopy and soil), and ground heat flux, all in [W m-2].
Args:
air_temperature: Air temperature, [C]
layer_structure: Instance of LayerStructure
light_extinction_coefficient: Light extinction coefficient for canopy, unitless
initial_flux_value: Initial non-zero flux, [W m-2]
Returns:
Dictionary with canopy temperature, [C], sensible and latent heat flux (canopy
and soil), [W m-2], and ground heat flux, [W m-2].
"""
output = {}
# Initialise canopy temperature, equilibrium with surrounding air temperature, [C]
canopy_temperature = layer_structure.from_template()
canopy_temperature[layer_structure.index_filled_canopy] = air_temperature[
layer_structure.index_filled_canopy
]
canopy_temperature[layer_structure.index_surface_scalar] = air_temperature[
layer_structure.index_surface_scalar
]
output["canopy_temperature"] = canopy_temperature
# Base flux template (non-zero minimum)
base_flux = layer_structure.from_template()
base_flux[layer_structure.index_flux_layers] = initial_flux_value
# Fluxes that share the same structure
for name in (
"sensible_heat_flux",
"latent_heat_flux",
"longwave_emission",
):
output[name] = base_flux.copy()
# 1D fluxes (cell-wise)
output["ground_heat_flux"] = DataArray(
np.full(base_flux.sizes["cell_id"], initial_flux_value),
dims="cell_id",
)
return output
[docs]
def calculate_longwave_emission(
temperature: NDArray[np.floating],
emissivity: float | NDArray[np.floating],
stefan_boltzmann: float,
) -> NDArray[np.floating]:
"""Calculate longwave emission using the Stefan Boltzmann law.
According to the Stefan Boltzmann law, the amount of radiation emitted per unit time
from the area of a black body at absolute temperature is directly proportional to
the fourth power of the temperature. Emissivity (which is equal to absorptive power)
lies between 0 to 1.
Args:
temperature: Temperature, [K]
emissivity: Emissivity, dimensionless
stefan_boltzmann: Stefan Boltzmann constant, [W m-2 K-4]
Returns:
Longwave emission, [W m-2]
"""
return emissivity * stefan_boltzmann * temperature**4
[docs]
def calculate_sensible_heat_flux(
density_air: NDArray[np.floating],
specific_heat_air: NDArray[np.floating],
air_temperature: NDArray[np.floating],
surface_temperature: NDArray[np.floating],
aerodynamic_resistance: float | NDArray[np.floating],
) -> NDArray[np.floating]:
r"""Calculate sensible heat flux.
The sensible heat flux :math:`H` is calculated using the following equation:
.. math::
H = \frac{\rho_{a} c_{p}}{r_{a}} (T_{s} - T_{a})
where :math:`\rho_{a}` is the density of air, :math:`c_{p}` is the specific heat
capacity of air at constant pressure, :math:`r_{a}` is the aerodynamic resistance of
the surface, :math:`T_{s}` is the surface temperature, and :math:`T_{a}` is the air
temperature.
Args:
density_air: Density of air, [kg m-3]
specific_heat_air: Specific heat of air, [J kg-1 K-1]
air_temperature: Air temperature, [C]
surface_temperature: Surface temperature (canopy or soil), [C]
aerodynamic_resistance: Aerodynamic resistance, [s m-1]
Returns:
sensible heat flux, [W m-2]
"""
return (density_air * specific_heat_air / aerodynamic_resistance) * (
surface_temperature - air_temperature
)
[docs]
def calculate_absorbed_longwave_radiation(
downward_longwave: NDArray[np.floating],
leaf_area_index: NDArray[np.floating],
leaf_emissivity: float,
soil_emissivity: float,
extinction_coefficient_lw: float,
surface_index: int,
topsoil_index: int,
) -> NDArray[np.floating]:
"""Calculate absorbed longwave radiation per layer using Beer-Lambert attenuation.
Each canopy layer absorbs downward longwave attenuated from above. The surface
layer receives the remainder after full canopy attenuation. The topsoil receives
what the surface layer transmitted.
Upward longwave from the soil is NOT included here — it is already accounted
for in calculate_soil_fluxes via longwave_emission_soil, which drives the
ground heat flux and soil sensible heat flux to the surface air layer.
Including it here would double-count the soil longwave emission.
Args:
downward_longwave: Downward longwave at top of canopy [W m-2], shape (cells,)
leaf_area_index: LAI per layer, shape (layers, cells), NaN for empty layers
leaf_emissivity: Leaf emissivity, dimensionless
soil_emissivity: Soil emissivity, dimensionless
extinction_coefficient_lw: Longwave extinction coefficient, typically ~0.5
surface_index: Row index of surface layer
topsoil_index: Row index of topsoil layer
Returns:
Absorbed longwave radiation per layer, shape (layers, cells) [W m-2].
"""
n_layers, n_cells = leaf_area_index.shape
absorbed = np.zeros((n_layers, n_cells))
cumulative_lai = np.zeros(n_cells)
arriving = downward_longwave.copy().astype(float)
for layer in range(n_layers):
lai = np.nan_to_num(leaf_area_index[layer], nan=0.0)
if layer == surface_index:
# Downward sky LW attenuated through full canopy above
transmittance = np.exp(-extinction_coefficient_lw * cumulative_lai)
arriving_down = downward_longwave * transmittance
absorbed[layer] = leaf_emissivity * arriving_down
# What passes through to soil
arriving = (1.0 - leaf_emissivity) * arriving_down
elif layer == topsoil_index:
# Soil absorbs what the surface layer transmitted
absorbed[layer] = soil_emissivity * arriving
else:
transmittance_above = np.exp(-extinction_coefficient_lw * cumulative_lai)
transmittance_below = np.exp(
-extinction_coefficient_lw * (cumulative_lai + lai)
)
absorbed[layer] = np.where(
lai > 0,
leaf_emissivity
* downward_longwave
* (transmittance_above - transmittance_below),
0.0,
)
cumulative_lai += lai
return absorbed
[docs]
def update_soil_temperature(
ground_heat_flux: NDArray[np.floating],
soil_temperature: NDArray[np.floating],
soil_layer_thickness: NDArray[np.floating],
soil_thermal_conductivity: float | NDArray[np.floating],
soil_bulk_density: float | NDArray[np.floating],
specific_heat_capacity_soil: float | NDArray[np.floating],
time_interval: float,
) -> NDArray[np.floating]:
r"""Update soil temperature using heat diffusion.
The function applies an explicit finite-difference approach to update
soil temperatures based on thermal diffusivity and heat flux.
Governing equations:
Soil thermal diffusivity:
.. math::
\alpha = \frac{\lambda}{\rho_s c_s}
where :math:`\lambda` is the soil thermal conductivity [W m-1 K-1],
:math:`\rho_s` is the soil bulk density [kg m-3], :math:`c_s` is the specific heat
capacity of soil [J kg-1 K-1].
Internal layer update:
.. math::
T_i^{t+\Delta t} = T_i^t + (\Delta t / \Delta z^2)
* \alpha * (T_{i+1}^t - 2T_i^t + T_{i-1}^t)
Top layer update with ground heat flux:
.. math::
T_0^{t+\Delta t} = T_0^t + (\Delta t / (\rho_s c_s \Delta z)) * G
No-heat-flux bottom boundary condition:
.. math::
T_{n-1}^{t+\Delta t} = T_{n-1}^t + (\Delta t / \Delta z^2)
* \alpha * (T_{n-2}^t - T_{n-1}^t)
Args:
ground_heat_flux: Ground heat flux at top soil, [W m-2]
soil_temperature: Soil temperature for each soil layer, [C]
soil_thermal_conductivity: Thermal conductivity of soil, [W m-2 K-1]
soil_bulk_density: Soil bulk density, [kg m-3]
specific_heat_capacity_soil: Specific heat capacity of soil, [J kg-1 K-1]
soil_layer_thickness: Thickness of each soil layer, [m]
time_interval: Time interval, [s]
Returns:
Updated soil temperatures, [C]
"""
n_layers = len(soil_temperature)
# Soil thermal diffusivity, [m2 s-1]
soil_thermal_diffusivity = soil_thermal_conductivity / (
soil_bulk_density * specific_heat_capacity_soil
)
# Update internal layers using diffusion
for i in range(1, n_layers - 1):
soil_temperature[i, :] += (
(time_interval / soil_layer_thickness[i] ** 2)
* soil_thermal_diffusivity
* (
soil_temperature[i + 1, :]
- 2 * soil_temperature[i, :]
+ soil_temperature[i - 1, :]
)
)
# Update top layer with ground heat flux
soil_temperature[0, :] += (
time_interval
/ (soil_bulk_density * specific_heat_capacity_soil * soil_layer_thickness[0])
) * ground_heat_flux
# No heat flux boundary at the bottom (insulation assumption)
soil_temperature[-1, :] += (
(time_interval / soil_layer_thickness[-1] ** 2)
* soil_thermal_diffusivity
* (soil_temperature[-2, :] - soil_temperature[-1, :])
)
return soil_temperature
[docs]
def calculate_energy_balance_residual(
canopy_temperature_initial: NDArray[np.floating],
air_temperature: NDArray[np.floating],
evapotranspiration: NDArray[np.floating],
absorbed_shortwave_radiation: NDArray[np.floating],
absorbed_longwave_radiation: NDArray[np.floating],
specific_heat_air: NDArray[np.floating],
density_air: NDArray[np.floating],
aerodynamic_resistance: NDArray[np.floating],
latent_heat_vapourisation: NDArray[np.floating],
surface_index: int,
leaf_emissivity: float,
stefan_boltzmann_constant: float,
zero_Celsius: float,
seconds_to_hour: float,
return_fluxes: bool,
) -> NDArray[np.floating] | dict[str, NDArray[np.floating]]:
r"""Calculate energy balance residual for canopy.
The energy balance residual (:math:`\frac{dQ}{dt}`) for the canopy is given by:
.. math::
\frac{dQ}{dt} = R_{abs} - \epsilon_{l} \sigma T_{l}^{4} - H - \lambda E - PP
Where :math:`R_abs` is the absorbed shortwave and longwave radiation by the canopy,
:math:`\epsilon_{l}` is the leaf emissivity, :math:`\sigma` is the Stefan-Boltzmann
constant, :math:`T_{l}` is the leaf temperature, :math:`H` is the sensible heat
flux from the canopy, :math:`\lambda E` is the latent heat flux from the canopy,
:math:`PP` is a fraction of the absorbed light is used in photosynthesis (PAR).
Args:
canopy_temperature_initial: Initial leaf temperature for all canopy layers, [C]
air_temperature: Initial air temperature in canopy layers, [C]
evapotranspiration: Evapotranspiration, [mm]
absorbed_shortwave_radiation: Absorbed shortwave radiation for all canopy
layers, [W m-2]
absorbed_longwave_radiation: Absorbed longwave radiation for all canopy layers,
[W m-2]
specific_heat_air: Specific heat capacity of air, [J kg-1 K-1]
density_air: Density of air, [kg m-3]
aerodynamic_resistance: Aerodynamic resistance of canopy, [s m-1]
latent_heat_vapourisation: Latent heat of vapourisation, [J kg-1]
surface_index: Row index of surface layer
leaf_emissivity: Leaf emissivity, dimensionless
stefan_boltzmann_constant: Stefan Boltzmann constant, [W m-2 K-4]
zero_Celsius: Factor to convert between Celsius and Kelvin
seconds_to_hour: Factor to convert between hours and seconds
return_fluxes: Flag to indicate if all components of the energy balance should
be returned. This is false for the newton approach to solve for canopy
temperature, but true to create the outputs in a second call afterwards.
Returns:
full energy balance or energy balance residual, [W m-2]
"""
# Longwave emission from canopy, [W m-2]
longwave_emission_canopy = calculate_longwave_emission(
temperature=canopy_temperature_initial + zero_Celsius,
emissivity=leaf_emissivity,
stefan_boltzmann=stefan_boltzmann_constant,
)
# Sensible heat flux from canopy layers, [W m-2]
sensible_heat_flux_canopy = calculate_sensible_heat_flux(
density_air=density_air,
specific_heat_air=specific_heat_air,
air_temperature=air_temperature,
surface_temperature=canopy_temperature_initial,
aerodynamic_resistance=aerodynamic_resistance,
)
# Latent heat flux canopy, [W m-2]
# The current implementation converts outputs from plant and hydrology model to
# ensure energy conservation between modules for now.
latent_heat_flux_canopy = calculate_latent_heat_flux(
evapotranspiration=evapotranspiration,
latent_heat_vapourisation=latent_heat_vapourisation,
time_interval=seconds_to_hour,
)
# Net radiation, [W m-2]
net_radiation = (
absorbed_shortwave_radiation
+ absorbed_longwave_radiation
- longwave_emission_canopy
)
# Energy balance residual, [W m-2]
energy_balance_residual = (
absorbed_shortwave_radiation
+ absorbed_longwave_radiation
- longwave_emission_canopy
- sensible_heat_flux_canopy
- latent_heat_flux_canopy
)
# Add longwave_emission from surface to net radiation and energy balance residual
energy_balance_residual[1] += 0.5 * longwave_emission_canopy[surface_index]
if return_fluxes:
energy_balance = {
"longwave_emission": longwave_emission_canopy,
"sensible_heat_flux": sensible_heat_flux_canopy,
"latent_heat_flux": latent_heat_flux_canopy,
"energy_balance_residual": energy_balance_residual,
"net_radiation": net_radiation,
}
return energy_balance
else:
return energy_balance_residual
[docs]
def update_canopy_air_temperature(
air_temperature: NDArray[np.floating],
sensible_heat_flux: NDArray[np.floating],
specific_heat_air: NDArray[np.floating],
density_air: NDArray[np.floating],
mixing_layer_thickness: NDArray[np.floating],
) -> NDArray[np.floating]:
r"""Update air temperature surrounding canopy in steady state.
The new air temperature :math:`T_{a}^{new}` is updated following
:cite:t:`bonan_climate_2019`:
.. math ::
H = \frac{\rho_a c_p}{r_a}(T_{l} - T_{a})
and
.. math::
T_{a}^{new} = T_{a}^{old} + \frac{H}{\rho_a c_p z}
where :math:`\rho_{a}` is the density of air, :math:`c_{p}` is the specific heat
capacity of air at constant pressure, :math:`r_{a}` is the aerodynamic resistance of
the surface, :math:`T_{s}` is the surface temperature, :math:`T_{a}` is the air
temperature, and :math:`z` is the thickness of the air layer we are updating.
Args:
air_temperature: Air temperature, [C]
sensible_heat_flux: Sensible heat flux, [W m-2]
specific_heat_air: Specific heat capacity of air, [J kg-1 K-1]
density_air: Density of air, [kg m-3]
mixing_layer_thickness: thickness of the air layer we are updating, [m]
Returns:
updated air temperatures, [C]
"""
# Update air temperature over a layer of height z (e.g., canopy height)
new_air_temperature = air_temperature + (
sensible_heat_flux / (density_air * specific_heat_air * mixing_layer_thickness)
)
return new_air_temperature
[docs]
def update_surface_air_temperature(
canopy_air_temperature: NDArray[np.floating],
state: dict[str, NDArray[np.floating]],
idx: SimpleNamespace,
denominator_tolerance: float,
):
"""Update surface air temperature in equilibrium with soil and canopy fluxes.
The surface air temperature is diagnosed from the soil and canopy bottom
conductances and temperatures, assuming equilibrium between the soil and canopy
fluxes. This is necessary because the surface layer is too thin to be updated based
on fluxes over a 1-hour timestep, and we want to avoid unrealistic surface air
temperatures that would arise from a flux-based update.
For cells with fewer canopy layers, the bottom canopy temperature is the last
finite value in the canopy temperature array. For cells with no canopy, the
above-canopy reference temperature is used instead.
Args:
canopy_air_temperature: Canopy air temperature, [C]
state: Dictionary of state variables
idx: Layer structure index
denominator_tolerance: Small value to prevent division by zero
Returns:
Updated surface air temperature, [C]
"""
# Last finite canopy temperature per cell — bottom-most occupied canopy layer
# Returns NaN for cells with no canopy
canopy_bottom_temperature = find_last_valid_row(canopy_air_temperature)
# For cells with no canopy, fall back to above-canopy reference (row 0)
has_canopy = np.isfinite(canopy_bottom_temperature)
canopy_bottom_temperature = np.where(
has_canopy,
canopy_bottom_temperature,
state["air_temperature"][0], # above-canopy reference
)
# Conductance-weighted average of soil and canopy bottom temperatures
g_soil = 1.0 / np.maximum(
state["aerodynamic_resistance_soil"], denominator_tolerance
)
g_canopy = 1.0 / np.maximum(
state["aerodynamic_resistance_canopy"], denominator_tolerance
)
surface_air_temperature = (
g_soil * state["soil_temperature"][idx.topsoil]
+ g_canopy * canopy_bottom_temperature
) / (g_soil + g_canopy)
return surface_air_temperature
[docs]
def update_specific_humidity(
evapotranspiration: NDArray[np.floating],
soil_evaporation: NDArray[np.floating],
specific_humidity: NDArray[np.floating],
layer_thickness: NDArray[np.floating],
density_air: NDArray[np.floating],
mm_to_kg: float,
cell_area: float,
time_interval: float,
surface_index: int,
) -> NDArray[np.floating]:
"""Update specific humidity from evapotranspiration and soil evaporation.
This function adds the water from soil evaporation and canopy evapotranspiration to
each atmospheric layer. No limits are applied at this stage and no vertical mixing.
Args:
evapotranspiration: Evapotranspiration, [mm]
soil_evaporation: Soil evaporation to surface layer, [mm]
saturated_vapour_pressure: Saturated vapour pressure, [kPa]
specific_humidity: Specific humidity, [kg kg-1]
layer_thickness: Layer thickness, [m]
density_air: Density of air, [kg m-3]
mm_to_kg: Factor to convert variable unit from millimeters to kilograms of
water per square meter
cell_area: Grid cell area, [m2]
time_interval: Time interval, [s]
surface_index: Index of surface layer
Returns:
update specific_humidity, [kg kg-1]
"""
# Convert evapotranspiration and soil evaporation [mm] to [kg m2 s-1] time interval
evapotranspiration_kg_m2 = evapotranspiration * mm_to_kg / time_interval
soil_evap_kg_m2 = soil_evaporation * mm_to_kg / time_interval
# Calculate air layer volumes [m3]
layer_volumes = layer_thickness * cell_area
air_mass_per_layer = layer_volumes * density_air
# Add ET and soil evaporation as mass flux [kg]
added_mass = np.zeros_like(layer_thickness)
added_mass += evapotranspiration_kg_m2 * cell_area * time_interval
added_mass[surface_index] += soil_evap_kg_m2 * cell_area * time_interval
# Update water mass in air
water_mass_in_air = specific_humidity * air_mass_per_layer
water_mass_in_air += added_mass
# Convert back to specific humidity and fill layer above with reference value
specific_humidity_out = water_mass_in_air / air_mass_per_layer
specific_humidity_out[0, :] = specific_humidity[0, :]
return specific_humidity_out
[docs]
def update_humidity_vpd(
saturated_vapour_pressure: NDArray[np.floating],
specific_humidity_mixed: NDArray[np.floating],
atmospheric_pressure: NDArray[np.floating],
layer_thickness: NDArray[np.floating],
molecular_weight_ratio_water_to_dry_air: float,
dry_air_factor: float,
density_air: NDArray[np.floating],
cell_area: float,
limits_relative_humidity: tuple[float, float, float],
limits_vapour_pressure_deficit: tuple[float, float, float],
denominator_tolerance: float,
) -> dict[str, NDArray[np.floating]]:
"""Update atmospheric humidity and vapour pressure deficit.
Args:
saturated_vapour_pressure: Saturated vapour pressure, [kPa]
specific_humidity_mixed: Specific humidity vertically mixed, [kg kg-1]
atmospheric_pressure: Atmospheric pressure, [kPa]
layer_thickness: Layer thickness, [m]
molecular_weight_ratio_water_to_dry_air: Molecular weight ratio of water to dry
air, dimensionless
dry_air_factor: Complement of water_to_air_mass_ratio, accounting for dry air
density_air: Density of air, [kg m-3]
cell_area: Grid cell area, [m2]
limits_relative_humidity: Realistic bounds of relative humidity, []
limits_vapour_pressure_deficit: Realistic bounds for vapour pressure deficit,
[kPa]
denominator_tolerance: Small value to prevent division by zero
Returns:
A dictionary containing arrays of updated ``relative_humidity``,
``specific_humidity``, ``vapour_pressure`` and ``vapour_pressure_deficit`` values.
"""
# Create a mask of where the input was NaN (no true canopy)
input_nan_mask = np.isnan(specific_humidity_mixed)
# Calculate air layer volumes [m3]
layer_volumes = layer_thickness * cell_area
air_mass_per_layer = layer_volumes * density_air
# Saturation constraint and condensation
# Saturation specific humidity
saturation_specific_humidity = (
molecular_weight_ratio_water_to_dry_air
+ dry_air_factor * saturated_vapour_pressure
) / np.maximum(
atmospheric_pressure - saturated_vapour_pressure, denominator_tolerance
)
# Negative values mean supersaturation
specific_humidity_deficit = saturation_specific_humidity - specific_humidity_mixed
# Excess humidity available for condensation, [kg kg-1]
excess_specific_humidity = np.where(
specific_humidity_deficit > 0,
0.0,
specific_humidity_deficit,
)
# Convert excess to condensed water, [mm]
condensed_water_mass = -excess_specific_humidity * air_mass_per_layer
# Convert to equivalent water depth, [mm]
condensation_mm = condensed_water_mass / cell_area
# Remove excess from air, not the sign of the excess is negative
specific_humidity_updated = specific_humidity_mixed + excess_specific_humidity
# Vapour pressure [kPa]
vapour_pressure_updated = (specific_humidity_updated * atmospheric_pressure) / (
molecular_weight_ratio_water_to_dry_air
+ dry_air_factor * specific_humidity_updated
)
# Compute new relative humidity (%)
relative_humidity_updated = (
vapour_pressure_updated
/ np.maximum(saturated_vapour_pressure, denominator_tolerance)
) * 100
relative_humidity_updated = np.minimum(
relative_humidity_updated, limits_relative_humidity[1]
)
# Compute new VPD (Vapour Pressure Deficit) [kPa], ensure non-zero
vpd_updated = saturated_vapour_pressure - vapour_pressure_updated
vpd_updated = np.maximum(vpd_updated, limits_vapour_pressure_deficit[0])
# Map variable names to arrays
raw_outputs = {
"relative_humidity": relative_humidity_updated,
"vapour_pressure": vapour_pressure_updated,
"vapour_pressure_deficit": vpd_updated,
"specific_humidity": specific_humidity_updated,
"condensation": condensation_mm,
}
# Clean outputs while preserving intended NaNs
cleaned_outputs = {
key: set_unintended_nan_to_zero(arr, input_nan_mask)
for key, arr in raw_outputs.items()
}
return cleaned_outputs
[docs]
def calculate_latent_heat_flux(
evapotranspiration: NDArray[np.floating],
latent_heat_vapourisation: NDArray[np.floating],
time_interval: float,
) -> NDArray[np.floating]:
"""Calculate latent heat flux from evapotranspiration.
Args:
evapotranspiration: Evapotranspiration per unit area, [kg m-2]
(1 kg m-2 of water = 1 mm of water)
latent_heat_vapourisation: Latent heat of vaporisation of water, [J kg-1]
time_interval: Time interval over which flux is computed, [s]
Returns:
Latent heat flux, [W m-2]
"""
# Energy transferred as latent heat [J m-2] over the time interval
energy_j_per_m2 = evapotranspiration * latent_heat_vapourisation
# Convert to flux [W m-2] by dividing by time interval [s]
latent_heat_flux = energy_j_per_m2 / time_interval
return latent_heat_flux
[docs]
def calculate_total_absorbed_shortwave_radiation(
downward_shortwave_radiation: NDArray[np.floating],
shortwave_absorption_by_canopy: NDArray[np.floating],
fraction_par_used: float,
leaf_absorptance_non_par: float,
par_fraction: float,
) -> NDArray[np.floating]:
"""Compute total absorbed shortwave radiation contributing to leaf energy balance.
Shortwave (SW) radiation is partitioned into:
- PAR (photosynthetically active radiation)
- non-PAR (primarily near-infrared, NIR)
The plant model provides absorbed PAR. A fraction of this PAR is used
in photosynthesis, while the remainder contributes to heat. Non-PAR radiation
is assumed not to be used in photosynthesis and contributes entirely to heat
after accounting for leaf absorptance.
This function calculates the total absorbed shortwave radiation that contributes to
the leaf energy balance by combining the absorbed PAR (adjusted for photosynthesis)
and the absorbed non-PAR radiation (adjusted for leaf absorptance and vertical
distribution).
Args:
downward_shortwave_radiation: Incoming shortwave radiation [W m-2]
shortwave_absorption_by_canopy: Absorbed PAR by canopy [W m-2]
fraction_par_used: Fraction of absorbed PAR used in photosynthesis (0-1).
leaf_absorptance_non_par: Fraction of non-PAR radiation absorbed by the leaf
(0-1).
par_fraction: Fraction of total shortwave radiation that is PAR (0-1).
Returns:
Total absorbed shortwave radiation contributing to heat [W m-2]
"""
# Compute vertical distribution weights based on absorbed PAR
weights = compute_weights_from_absorbed_radiation(
radiation=shortwave_absorption_by_canopy
)
# Calculate the portion of absorbed non-PAR that contributes to heat, assuming the
# same vertical decay as PAR absorption (weights to distribute cross layers)
shortwave_non_par = downward_shortwave_radiation * (1 - par_fraction)
shortwave_non_par_absorbed = leaf_absorptance_non_par * shortwave_non_par * weights
# Calculate the portion of absorbed PAR that contributes to heat after accounting
# for photosynthesis
# TODO: #1533 we want to use light use efficiency from the plant model to determine
# the fraction of absorbed PAR used in photosynthesis; for now we use a constant
# fraction.
par_heat = shortwave_absorption_by_canopy * (1 - fraction_par_used)
# Total absorbed shortwave radiation contributing to heat
total_abs = par_heat + shortwave_non_par_absorbed
return total_abs
[docs]
def secant_solve_cells_layers(
residual_function: Callable[[NDArray[np.floating]], NDArray[np.floating]],
initial_guess: NDArray[np.floating],
maxiter_secant: int,
convergence_tolerance: float,
small_perturbation_second_guess: float,
denominator_tolerance: float,
) -> NDArray[np.floating]:
"""Vectorised secant solver for independent (cell, layer) root problems.
Args:
residual_function: Function f(T) returning residual with same shape as T.
initial_guess: Initial guess canopy temperature, shape (n_cells, n_layers).
maxiter_secant: Maximum secant iterations.
convergence_tolerance: Convergence tolerance on max absolute update.
small_perturbation_second_guess: Small perturbation for second initial guess.
denominator_tolerance: Small value to prevent division by zero in secant update.
Returns:
Root estimate canopy temperature solving f(T)=0 elementwise.
"""
previous_temperature = initial_guess.copy()
current_temperature = initial_guess + small_perturbation_second_guess
previous_residual = residual_function(previous_temperature)
current_residual = residual_function(current_temperature)
for _ in range(maxiter_secant):
denom = current_residual - previous_residual
# Ensure no division by zero and sign conserved
safe_denom = np.where(
np.abs(denom) < denominator_tolerance,
np.copysign(denominator_tolerance, denom),
denom,
)
denom = np.where(np.isnan(safe_denom), np.nan, safe_denom)
next_temperature = (
current_temperature
- current_residual * (current_temperature - previous_temperature) / denom
)
next_residual = residual_function(next_temperature)
max_update = np.nanmax(np.abs(next_temperature - current_temperature))
if max_update < convergence_tolerance:
return next_temperature
previous_temperature, current_temperature = (
current_temperature,
next_temperature,
)
previous_residual, current_residual = (
current_residual,
next_residual,
)
return current_temperature
[docs]
def make_canopy_residual(
state: dict[str, Any],
static: dict[str, Any],
aerodynamic_resistance: NDArray[np.floating],
abiotic_constants: AbioticConstants,
core_constants: CoreConstants,
surface_index: int,
) -> Callable[[NDArray[np.floating]], NDArray[np.floating]]:
"""Creates a residual function for canopy temperature to be used in root finding.
Args:
state: Dictionary containing state variables needed for the energy balance
residual.
static: Dictionary containing static variables needed for the energy balance
residual.
aerodynamic_resistance: Aerodynamic resistance of canopy, [s m-1]
abiotic_constants: Constants related to abiotic processes.
core_constants: Core constants.
surface_index: Row index of surface layer.
Returns:
A function that takes canopy_temperature as input and returns the energy balance
residual.
"""
def residual(canopy_temperature):
return calculate_energy_balance_residual(
canopy_temperature_initial=canopy_temperature,
air_temperature=state["air_temperature"],
evapotranspiration=state["evapotranspiration"],
absorbed_shortwave_radiation=state["shortwave_absorption"],
absorbed_longwave_radiation=static["absorbed_longwave_radiation"],
specific_heat_air=state["specific_heat_air"],
density_air=state["density_air"],
aerodynamic_resistance=aerodynamic_resistance,
latent_heat_vapourisation=state["latent_heat_vapourisation"],
surface_index=surface_index,
leaf_emissivity=abiotic_constants.leaf_emissivity,
stefan_boltzmann_constant=core_constants.stefan_boltzmann_constant,
zero_Celsius=core_constants.zero_Celsius,
seconds_to_hour=core_constants.seconds_to_hour,
return_fluxes=False,
)
return residual