Source code for virtual_ecosystem.models.hydrology.above_ground

"""The ``models.hydrology.above_ground`` module simulates the above-ground hydrological
processes for the Virtual Ecosystem. At the moment, this includes rain water
interception by the canopy, canopy evaporation, soil evaporation,
and functions related to surface runoff, bypass flow, and river discharge.

TODO change temperatures to Kelvin

"""  # noqa: D205

from math import sqrt

import numpy as np
from numpy.typing import NDArray
from pyrealm.constants import CoreConst as PyrealmCoreConst
from pyrealm.core.hygro import calc_vp_sat

from virtual_ecosystem.core.grid import Grid
from virtual_ecosystem.core.logger import LOGGER
from virtual_ecosystem.models.abiotic.abiotic_tools import (
    calculate_slope_of_saturated_pressure_curve,
)


[docs] def potential_evaporation_leaf( net_radiation: NDArray[np.floating], vapour_pressure_deficit: NDArray[np.floating], air_temperature: NDArray[np.floating], density_air_kg: NDArray[np.floating], specific_heat_air: NDArray[np.floating], aerodynamic_resistance_canopy: NDArray[np.floating], stomatal_resistance: NDArray[np.floating], latent_heat_vapourisation: NDArray[np.floating], psychrometric_constant: NDArray[np.floating], saturated_pressure_slope_parameters: tuple[float, float, float, float], ): r"""Calculate canopy potential evaporation rate using Penman-Monteith equation. The potential evaporation rate :math:`EW_{0}` is calculated as follows: .. math:: EW_{0} = \frac{\Delta R_n + \rho_a c_p \frac{D}{r_a}} {\lambda_v \left(\Delta + \gamma \left(1 + \frac{r_s}{r_a}\right)\right)} where :math:`\Delta` is the slope of the saturation vapour pressure curve, :math:`R_n` is the net radiation, :math:`\rho_a` is the density of air, :math:`c_p` is the specific heat of air, :math:`D` is the vapour pressure deficit, :math:`r_a` is the aerodynamic resistance of canopy, :math:`\lambda_v` is the latent heat of vapourization, :math:`\gamma` is the psychrometric constant, and :math:`r_s` is the stomatal resistance. Note that we do NOT include ground heat flux in the consideration of canopy evaporation; TODO discuss where we use instead the energy flux into NPP Args: net_radiation: Net radiation at leaf surface, [W m-2] vapour_pressure_deficit: Vapour pressure deficit, [kPa] air_temperature: Air temperature, [C] density_air_kg: Air density, [kg m-3] specific_heat_air: Specific heat of air, [kJ kg-1 K-1] aerodynamic_resistance_canopy: Aerodynamic resistance in canopy, [s m-1] stomatal_resistance: Stomatal resistance, [s m-1] latent_heat_vapourisation: Latent heat of vapourisation, [kJ kg-1] psychrometric_constant: Psychrometric constant, [kPa K-1] saturated_pressure_slope_parameters: List of parameters to calculate the slope of the saturated vapour pressure curve Returns: potential evaporation rate, [kg m-2 s-1] """ # Slope of saturation vapour pressure curve (kPa/K) delta = calculate_slope_of_saturated_pressure_curve( temperature=air_temperature, saturated_pressure_slope_parameters=saturated_pressure_slope_parameters, ) # Penman-Monteith equation potential_evaporation = ( delta * net_radiation + density_air_kg * specific_heat_air * (vapour_pressure_deficit / aerodynamic_resistance_canopy) ) / ( latent_heat_vapourisation * ( delta + psychrometric_constant * (1 + stomatal_resistance / aerodynamic_resistance_canopy) ) ) return potential_evaporation
[docs] def calculate_canopy_evaporation( leaf_area_index: NDArray[np.floating], interception: NDArray[np.floating], net_radiation: NDArray[np.floating], vapour_pressure_deficit: NDArray[np.floating], air_temperature: NDArray[np.floating], density_air_kg: NDArray[np.floating], specific_heat_air: NDArray[np.floating], aerodynamic_resistance_canopy: NDArray[np.floating], stomatal_resistance: NDArray[np.floating], latent_heat_vapourisation: NDArray[np.floating], psychrometric_constant: NDArray[np.floating], saturated_pressure_slope_parameters: tuple[float, float, float, float], time_interval: float, extinction_coefficient_global_radiation: float, ) -> NDArray[np.floating]: r"""Calculate evaporation of intercepted water from the canopy, [mm]. This function calculates evaporation of intercepted water from the canopy following the LISFLOOD model :cite:t:`van_der_knijff_lisflood_2010`. The maximum evaporation per time step :math:`EW_{max}` [mm] is proportional to the fraction of vegetated area: .. math :: EW_{max} = EW_{0} [1 - e^{(-\kappa_{gb} LAI)}] \Delta t where :math:`EW_{0}` is the potential evaporation rate, the dimensionless constant :math:`\kappa_{gb}` is the extinction coefficient for global solar radiation. In LISFLOOD, :math:`\kappa_{gb}` is given by the product :math:`0.75 \cdot \kappa_{df}`, where :math:`\kappa_{df}` is the extinction coefficient for diffuse visible light: its value is provided as input to the model and it varies between 0.4 and 1.1. The actual amount of evaporation :math:`EW_{int}` [mm] is limited by the amount of water stored on the leaves :math:`Int_{cum}`: .. math :: EW_{int} = min(EW_{max} \Delta t, Int_{cum}) Leaf drainage is not modelled explicitly given the short residence time of water on the leaves compared to the model time step. Args: leaf_area_index: Leaf area index, [m m-1] interception: Interception of water in canopy, [mm] net_radiation: Net radiation in canopy, [W m-2] vapour_pressure_deficit: Vapour pressure deficit, [kPa] air_temperature: Air temperature in canopy, [C] density_air_kg: Density of air, [kg m-3] specific_heat_air: Specific heat of air, [kJ kg-1 K-1] aerodynamic_resistance_canopy: Aerodynamic resistance in canopy, [s m-1] stomatal_resistance: Stomatal resistance, [s m-1] latent_heat_vapourisation: Latent heat of vapourisation, [kJ kg-1] psychrometric_constant: Psychrometric constant, [kPa K-1] saturated_pressure_slope_parameters: List of parameters to calculate the slope of the saturated vapour pressure curve time_interval: Time interval, [s] extinction_coefficient_global_radiation: Extinction coefficient for global radiation Returns: canopy evaporation [mm per time interval] """ output = {} # Potential evaporation from open surface water, [kg m-2 s-1] potential_evaporation = potential_evaporation_leaf( net_radiation=net_radiation, vapour_pressure_deficit=vapour_pressure_deficit, air_temperature=air_temperature, density_air_kg=density_air_kg, specific_heat_air=specific_heat_air, aerodynamic_resistance_canopy=aerodynamic_resistance_canopy, stomatal_resistance=stomatal_resistance, latent_heat_vapourisation=latent_heat_vapourisation, psychrometric_constant=psychrometric_constant, saturated_pressure_slope_parameters=saturated_pressure_slope_parameters, ) # Maximum evaporation from each layer, [mm day-1] maximum_evaporation = ( potential_evaporation * (1.0 - np.exp(-extinction_coefficient_global_radiation * leaf_area_index)) * time_interval ) # Avoid division by zero by replacing 0s with np.nan temporarily with np.errstate(divide="ignore", invalid="ignore"): scale_factor = np.where( maximum_evaporation > 0, np.minimum(interception / maximum_evaporation, 1.0), 0.0, ) # Actual evaporation, constrained by energy and water actual_evaporation = maximum_evaporation * scale_factor output["canopy_evaporation"] = np.where( np.isnan(leaf_area_index), np.nan, actual_evaporation ) # Update interception pool after evaporation # Ensure no negative interception return np.maximum(interception - actual_evaporation, 0.0)
[docs] def calculate_soil_evaporation( temperature: NDArray[np.floating], relative_humidity: NDArray[np.floating], atmospheric_pressure: NDArray[np.floating], soil_moisture: NDArray[np.floating], soil_moisture_residual: float | NDArray[np.floating], soil_moisture_saturation: float | NDArray[np.floating], leaf_area_index: NDArray[np.floating], wind_speed_surface: NDArray[np.floating], density_air: float | NDArray[np.floating], latent_heat_vapourisation: float | NDArray[np.floating], gas_constant_water_vapour: float, drag_coefficient_evaporation: float, extinction_coefficient_global_radiation: float, time_interval: float, pyrealm_core_constants: PyrealmCoreConst, ) -> dict[str, NDArray[np.floating]]: r"""Calculate soil evaporation based on classical bulk aerodynamic formulation. This function uses the so-called 'alpha' method to estimate the evaporative flux :cite:p:`mahfouf_comparative_1991`. We here use the implementation by :cite:t:`barton_parameterization_1979`: .. math :: \alpha = \frac{1.8 \Theta}{\Theta + 0.3} .. math :: E_{g} = \frac{\rho_{air}}{R_{a}} (\alpha q_{sat}(T_{s}) - q_{g}) where :math:`\Theta` is the available top soil moisture (relative volumetric water content), :math:`E_{g}` is the evaporation flux (W m-2), :math:`\rho_{air}` is the density of air (kg m-3), :math:`R_{a}=(C_{E} u_{a})^-1` is the aerodynamic resistance, with :math:`C_{E}` the drag coefficient for evaporation and :math:`u_{a}` the wind speed near the surface, :math:`q_{sat}(T_{s})` (unitless) is the saturated specific humidity, and :math:`q_{g}` is the surface specific humidity (unitless). In a final step, the bare soil evaporation is adjusted to shaded soil evaporation :cite:t:`supit_system_1994`: .. math :: E_{act} = E_{g} e^{(-\kappa_{gb} LAI)} where :math:`\kappa_{gb}` is the extinction coefficient for global radiation, and :math:`LAI` is the total leaf area index. Args: temperature: Air temperature at reference height, [C] relative_humidity: Relative humidity at reference height, [] atmospheric_pressure: Atmospheric pressure at reference height, [kPa] soil_moisture: Volumetric relative water content, [unitless] soil_moisture_residual: Residual soil moisture, [unitless] soil_moisture_saturation: Soil moisture saturation, [unitless] wind_speed_surface: Wind speed in the bottom air layer, [m s-1] density_air: Density if air, [kg m-3] latent_heat_vapourisation: Latent heat of vapourisation, [kJ kg-1] leaf_area_index: Leaf area index [m m-1] gas_constant_water_vapour: Gas constant for water vapour, [kJ kg-1 K-1] drag_coefficient_evaporation: Drag coefficient for evaporation, dimensionless extinction_coefficient_global_radiation: Extinction coefficient for global radiation, [unitless] time_interval: Time interval, [s] pyrealm_core_constants: Core constants from pyrealm package Returns: soil evaporation, [mm per time interval], aerodynamic resistance soil [s m-1] """ output: dict[str, NDArray[np.floating]] = {} # Available soil moisture soil_moisture_free = np.clip( (soil_moisture - soil_moisture_residual), 0.0, (soil_moisture_saturation - soil_moisture_residual), ) # Estimate alpha using the Barton (1979) equation barton_ratio = (1.8 * soil_moisture_free) / (soil_moisture_free + 0.3) alpha = np.where(barton_ratio > 1, 1, barton_ratio) # Calculate saturation vapour pressure, kPa saturation_vapour_pressure = calc_vp_sat( ta=temperature, core_const=pyrealm_core_constants, ) saturated_specific_humidity = ( gas_constant_water_vapour * saturation_vapour_pressure ) / ( atmospheric_pressure - (1 - gas_constant_water_vapour) * saturation_vapour_pressure ) specific_humidity_air = (relative_humidity * saturated_specific_humidity) / 100 aerodynamic_resistance_soil = 1 / ( wind_speed_surface * drag_coefficient_evaporation ) output["aerodynamic_resistance_soil"] = aerodynamic_resistance_soil evaporative_flux = (density_air / aerodynamic_resistance_soil) * ( alpha * saturation_vapour_pressure - specific_humidity_air ) # Prevent negative evaporation evaporative_flux = np.maximum(evaporative_flux, 0.0) output["soil_evaporation"] = ( ( # Return surface evaporation, [mm] evaporative_flux / latent_heat_vapourisation ).squeeze() * np.exp(-extinction_coefficient_global_radiation * leaf_area_index) * time_interval ) return output
[docs] def find_lowest_neighbour( neighbours: list[np.ndarray], elevation: np.ndarray, ) -> list[int]: """Find lowest neighbour for each grid cell from digital elevation model. This function finds the cell IDs of the lowest neighbour for each grid cell. This can be used to determine in which direction surface runoff flows. Args: neighbours: List of neighbour IDs elevation: Elevation, [m] Returns: list of lowest neighbour IDs """ lowest_neighbour = [] for cell_id, neighbors_id in enumerate(neighbours): downstream_id_loc = np.argmax(elevation[cell_id] - elevation[neighbors_id]) lowest_neighbour.append(neighbors_id[downstream_id_loc]) return lowest_neighbour
[docs] def find_upstream_cells(lowest_neighbour: list[int]) -> list[list[int]]: """Find all upstream cell IDs for all grid cells. This function identifies all cell IDs that are upstream of each grid cell. This can be used to calculate the water flow that goes though a grid cell. Args: lowest_neighbour: List of lowest neighbour cell IDs Returns: lists of all upstream IDs for each grid cell """ upstream_ids: list = [[] for i in range(len(lowest_neighbour))] for down_s, up_s in enumerate(lowest_neighbour): upstream_ids[up_s].append(down_s) return upstream_ids
[docs] def route_horizontal_flow( drainage_map: dict[int, list[int]], surface_runoff: np.ndarray, subsurface_runoff: np.ndarray, ) -> np.ndarray: """Route horizontal flow for each grid cell (instantaneous channel routing). This function calculates the total river discharge at each grid cell for the current timestep by combining: 1. Local generation: the water generated in the cell itself during the timestep, including surface runoff and subsurface (lateral + baseflow) runoff. 2. Inflow from upstream cells: contributions from all cells that drain into the current cell, using their local generation from the same timestep. No flows from previous timesteps are included, avoiding double-counting, and there is also no time delay, so all the water runs through the whole grid in one time step . Args: drainage_map: Dict mapping each cell ID -> list of upstream cell IDs surface_runoff: Surface runoff for this timestep, [mm] subsurface_runoff: Subsurface runoff for this timestep, [mm] Returns: Total river discharge at each grid cell, [mm] """ # local generation in this cell (surface + subsurface) local_generation = np.nan_to_num(surface_runoff, nan=0.0) + np.nan_to_num( subsurface_runoff, nan=0.0 ) inflow_from_upstream = np.zeros_like(local_generation) for cell_id, upstream_ids in enumerate(drainage_map.values()): if upstream_ids: inflow_from_upstream[cell_id] = np.sum(local_generation[upstream_ids]) total_river_discharge = local_generation + inflow_from_upstream if (total_river_discharge < 0.0).any(): to_raise = ValueError("The river discharge should not be negative!") LOGGER.error(to_raise) raise to_raise return total_river_discharge
[docs] def calculate_drainage_map(grid: Grid, elevation: np.ndarray) -> dict[int, list[int]]: """Calculate drainage map based on digital elevation model. This function finds the lowest neighbour for each grid cell, identifies all upstream cell IDs and creates a dictionary that provides all upstream cell IDs for each grid cell. This function currently supports only square grids. Args: grid: Grid object elevation: Elevation, [m] Returns: dictionary of cell IDs and their upstream neighbours TODO move this to core.grid once we decided on common use """ if grid.grid_type != "square": to_raise = ValueError("This grid type is currently not supported!") LOGGER.error(to_raise) raise to_raise # Establish neighbour relationships grid.set_neighbours(distance=sqrt(grid.cell_area)) # Find flow direction: each cell -> lowest neighbor lowest_neighbours = find_lowest_neighbour(grid.neighbours, elevation) n_cells = len(lowest_neighbours) # Build reverse graph: for each cell, who drains into it direct_upstream: dict[int, list[int]] = {i: [] for i in range(n_cells)} for cell, ln in enumerate(lowest_neighbours): if ln is not None: # sink cells have no lowest neighbor direct_upstream[ln].append(cell) # Recursive collection of all upstream cells def collect_upstream(cell: int, visited=None) -> list[int]: if visited is None: visited = set() for up in direct_upstream[cell]: if up not in visited: visited.add(up) collect_upstream(up, visited) return list(visited) # Compute upstream IDs for all cells upstream_ids = {cell: collect_upstream(cell) for cell in range(n_cells)} return upstream_ids
[docs] def calculate_interception( leaf_area_index: NDArray[np.floating], precipitation: NDArray[np.floating], intercept_parameters: tuple[float, float, float], veg_density_param: float, ) -> NDArray[np.floating]: r"""Estimate canopy interception. This function estimates canopy interception using the following storage-based equation after :cite:t:`aston_rainfall_1979` and :cite:t:`merriam_note_1960` as implemented in :cite:t:`van_der_knijff_lisflood_2010` : .. math :: Int = S_{max} [1 - e^{\frac{-k R \Delta t}{S_{max}}}] where :math:`Int` [mm] is the interception per time step, :math:`S_{max}` [mm] is the maximum interception, :math:`R` is the rainfall intensity per time step [mm] and the factor :math:`k` accounts for the density of the vegetation. :math:`S_{max}` is calculated using an empirical equation :cite:p:`von_hoyningen-huene_interzeption_1981`: .. math:: :nowrap: \[ S_{max} = \begin{cases} 0.935 + 0.498 \cdot \text{LAI} - 0.00575 \cdot \text{LAI}^{2}, & \text{LAI} > 0.1 \\ 0, & \text{LAI} \le 0.1, \end{cases} \] where LAI is the average Leaf area index [m1 m-1]. :math:`k` is estimated as: :math:`k=0.046 \cdot LAI` Args: leaf_area_index: Leaf area index for all canopy layers, [m m-1] precipitation: Precipitation, [mm] intercept_parameters: Parameters for equation estimating maximum canopy interception capacity. veg_density_param: Parameter used to estimate vegetation density for maximum canopy interception capacity estimate Returns: interception, [mm] """ capacity = ( intercept_parameters[0] + intercept_parameters[1] * leaf_area_index - intercept_parameters[2] * leaf_area_index**2 ) max_capacity = np.where(leaf_area_index > 0.1, capacity, 0.001) canopy_density_factor = veg_density_param * leaf_area_index interception = np.full_like(leaf_area_index, np.nan) interception[1] = max_capacity[1] * ( 1 - np.exp(-canopy_density_factor[1] * precipitation / max_capacity[1]) ) for layer in np.arange(2, len(leaf_area_index)): interception[layer] = max_capacity[layer] * ( 1 - np.exp( -canopy_density_factor[layer] * (precipitation - np.nansum(interception[:layer], axis=0)) / max_capacity[layer] ) ) return interception
[docs] def distribute_monthly_rainfall( total_monthly_rainfall: NDArray[np.floating], num_days: int, p_wet_wet: float, p_wet_dry: float, shape_parameter: float, scale_parameter: float, seed: int | None = None, ) -> NDArray[np.floating]: r"""Distribute monthly to daily rainfall using stochastic weather generator. Daily rainfall occurrence is simulated using a first-order Markov chain , and rainfall intensity on wet days is drawn from a Gamma distribution :cite:p:`katz_precipitation_1977`. Wet/dry occurrence model ------------------------ Let :math:`S_{t}` be the rainfall state on day :math:`t`: .. math:: S_t = \begin{cases} 1 & \text{wet day} \\ 0 & \text{dry day} \end{cases} The probability of rainfall depends on the previous day's state: .. math:: P(S_t = 1 \mid S_{t-1} = 1) = p_{ww} .. math:: P(S_t = 1 \mid S_{t-1} = 0) = p_{wd} Rainfall intensity model ------------------------ Rainfall on wet days is sampled from a Gamma distribution: .. math:: x_i \sim \text{Gamma}(k, \theta) where :math:`k` is a shape parameter (dimensionless), and :math:`\theta` is a scale parameter, (dimensionless). The sampled intensities are then scaled so that their sum equals the specified monthly rainfall total: .. math:: r_{i} = \frac{x_i}{\sum_{j=1}^{n} x_j} P where :math:`r_{i}` is the rainfall on day :math:`i` [mm], :math:`x_{i}` is the sampled Gamma intensity, and :math:`P` is the total monthly rainfall [mm]. Args: total_monthly_rainfall: Total rainfall per month [mm]. num_days: Number of days in the month. p_wet_wet: Probability a wet day follows a wet day. Typical values are 0.5-0.7 temperate climates; 0.6-0.8 humid climates; 0.3-0.5 arid climates p_wet_dry: Probability a wet day follows a dry day. Typical values are 0.1-0.3 arid climates; 0.2-0.4 temperate climates; 0.3-0.5 tropical climates shape_parameter: Shape parameter of the Gamma distribution controlling rainfall variability. Typical values are 0.7-1.0 intense storms / high variability; 1.0-2.0 moderate variability (common default 1.5); 2.0-4.0 more uniform rainfall scale_parameter: Scale parameter of the Gamma distribution controlling absolute magnitude of rainfall, typically 1.0. seed: Seed for random number generator (optional). Returns: Daily rainfall array (len(total_monthly_rainfall), num_days), [mm]. Raises: ValueError: if any input is invalid (negative rainfall, invalid probabilities, etc.) """ # Input validation if np.any(total_monthly_rainfall < 0): raise ValueError("Monthly rainfall values cannot be negative") if num_days <= 0: raise ValueError("num_days must be greater than 0") if not (0 <= p_wet_wet <= 1): raise ValueError("p_wet_wet must be between 0 and 1") if not (0 <= p_wet_dry <= 1): raise ValueError("p_wet_dry must be between 0 and 1") if shape_parameter <= 0: raise ValueError("shape_parameter must be positive") if scale_parameter <= 0: raise ValueError("scale_parameter must be positive") # Initialize random number generator (rng) and rainfall array rng = np.random.default_rng(seed) daily_rainfall_data = np.zeros((len(total_monthly_rainfall), num_days)) for m, total in enumerate(total_monthly_rainfall): states = np.zeros(num_days, dtype=int) # Initialize first day states[0] = rng.random() < p_wet_dry # Simulate Markov chain for wet/dry days for d in range(1, num_days): if states[d - 1] == 1: states[d] = rng.random() < p_wet_wet else: states[d] = rng.random() < p_wet_dry wet_days = np.where(states == 1)[0] if len(wet_days) == 0 or total <= 0: continue # Simulate rainfall intensities intensities = rng.gamma( shape=shape_parameter, scale=scale_parameter, size=len(wet_days) ) intensities *= total / intensities.sum() daily_rainfall_data[m, wet_days] = intensities return np.nan_to_num(np.array(daily_rainfall_data), nan=0.0)
[docs] def calculate_bypass_flow( top_soil_moisture: NDArray[np.floating], sat_top_soil_moisture: NDArray[np.floating], available_water: NDArray[np.floating], bypass_flow_coefficient: float, ) -> NDArray[np.floating]: r"""Calculate preferential bypass flow. Bypass flow is here defined as the flow that bypasses the soil matrix and drains directly to the groundwater. During each time step, a fraction of the water that is available for infiltration is added to the groundwater directly (i.e. without first entering the soil matrix). It is assumed that this fraction is a power function of the relative saturation of the superficial and upper soil layers. This results in the following equation (after :cite:t:`van_der_knijff_lisflood_2010`): .. math :: D_{pref, gw} = W_{av} (\frac{w_{1}}{w_{s1}})^{c_{pref}} where :math:`D_{pref, gw}` is the amount of preferential flow per time step [mm], :math:`W_{av}` is the amount of water that is available for infiltration, and :math:`c_{pref}` is an empirical shape parameter. This parameter affects how much of the water available for infiltration goes directly to groundwater via preferential bypass flow; a value of 0 means all surface water goes directly to groundwater, a value of 1 gives a linear relation between soil moisture and bypass flow. The equation returns a preferential flow component that becomes increasingly important as the soil gets wetter. Args: top_soil_moisture: Soil moisture of top soil layer, [mm] sat_top_soil_moisture: Soil moisture of top soil layer at saturation, [mm] available_water: Amount of water available for infiltration, [mm] bypass_flow_coefficient: Bypass flow coefficient, dimensionless Returns: preferential bypass flow, [mm] """ return ( available_water * (top_soil_moisture / sat_top_soil_moisture) ** bypass_flow_coefficient )
[docs] def convert_mm_flow_to_m3_per_second( river_discharge_mm: NDArray[np.floating], area: int | float, days: int, seconds_to_day: float, meters_to_millimeters: float, ) -> NDArray[np.floating]: """Convert river discharge from millimeters to m3 s-1. Args: river_discharge_mm: Total river discharge, [mm] area: Area of each grid cell, [m2] days: Number of days seconds_to_day: Second to day conversion factor meters_to_millimeters: Factor to convert between millimeters and meters Returns: river discharge rate for each grid cell, [m3 s-1] """ return river_discharge_mm / meters_to_millimeters / days / seconds_to_day * area
[docs] def calculate_surface_runoff( precipitation_surface: NDArray[np.floating], top_soil_moisture: NDArray[np.floating], top_soil_moisture_saturation: NDArray[np.floating], ) -> NDArray[np.floating]: """Calculate surface runoff, [mm]. Surface runoff is calculated with a simple bucket model based on :cite:t:`davis_simple_2017`: if precipitation exceeds top soil moisture saturation , the excess water is added to runoff and top soil moisture is set to soil moisture saturation value; if the top soil is not saturated, precipitation is added to the current soil moisture level and runoff is set to zero. TODO adjust saturation to account for new set of soil layers #535 Args: precipitation_surface: Precipitation that reaches surface, [mm] top_soil_moisture: Water content of top soil layer, [mm] top_soil_moisture_saturation: Soil mositure saturation of top soil layer, [mm] """ # Calculate how much water can be added to soil before saturation is reached, [mm] free_saturation_mm = top_soil_moisture_saturation - top_soil_moisture # Calculate daily surface runoff of each grid cell, [mm]; replace by SPLASH return np.where( precipitation_surface > free_saturation_mm, precipitation_surface - free_saturation_mm, 0, )