Source code for virtual_ecosystem.models.plants.exporter
"""The exporter module provides the CommunityDataExporter, which is used to control the
output of plant community data at each time step. An instance of the class is required
by the PlantsModel, which calls the ``dump()`` method within the setup and update steps
to export data continuously during the model run.
The exporter can be configured to write three different levels of data: cohort level
data and canopy structure data at both the community and individual stem levels. The
data being exported is best structured as data frames and is highly ragged across cells,
so is less well suited for export through the central data object.
""" # noqa: D205
from __future__ import annotations
from pathlib import Path
from typing import ClassVar
import numpy as np
import pandas as pd
from pyrealm.demography.canopy import Canopy, CohortCanopyData, CommunityCanopyData
from pyrealm.demography.community import Cohorts
from pyrealm.demography.tmodel import StemAllocation, StemAllometry
from virtual_ecosystem.core.exceptions import ConfigurationError
from virtual_ecosystem.core.logger import LOGGER
from virtual_ecosystem.models.plants.biomasses import Biomasses
from virtual_ecosystem.models.plants.communities import PlantCommunities
from virtual_ecosystem.models.plants.model_config import PlantsExportConfig
[docs]
class CommunityDataExporter:
"""The CommunityDataExporter class.
The class is used to export detailed plant community data from inside a PlantsModel
instance to CSV files. The community data is split across three output files:
* cohort data: details about the stems in each cohort, including the stem allometry
and the GPP allocation of the stem. The stem GPP allocation is not defined during
the model setup, so these attributes are set to ``np.nan`` for the initial output.
* community canopy data: community wide data on the canopy structure, such as the
heights of the canopy layers and the light transmission profile.
* stem canopy data: details of contribution in leaf area and fAPAR from each stem to
the community canopy model.
The data are written to standard file names in the provided output directory, which
will typically be the output directory used by the Virtual Ecosystem model run. The
``required_data`` attribute is used to set which data to export by providing a set
of values from: ``cohorts``, ``community_canopy`` and ``stem_canopy``.
In addition, the attribute arguments can be used to specify a subset of data
attributes to be exported. If an empty attribute set is provided (which is the
default) then the exporter will write all attributes, otherwise the exported data
will be reduced to just the named attributes.
Args:
output_directory: The output directory for the files
required_data: A set of the required data outputs.
cohort_attributes: An optional subset of cohort attributes to export
community_canopy_attributes: An optional subset of community canopy attributes
to export
stem_canopy_attributes: An optional subset of stem canopy attributes
to export
float_format: A float format string used when writing data.
"""
_outputs: ClassVar[dict[str, tuple[str, str]]] = dict(
cohorts=(
"plants_cohort_data.csv",
"_cohort_path",
),
community_canopy=(
"plants_community_canopy_data.csv",
"_community_canopy_path",
),
stem_canopy=(
"plants_stem_canopy_data.csv",
"_stem_canopy_path",
),
)
"""Connects the export data options to a tuple of standard output file and
internal path attribute names."""
available_attributes: ClassVar[dict[str, set[str]]] = {
"cohort_attributes": set(
[
"cell_id",
"time",
*StemAllometry.array_attrs,
*Cohorts.array_attrs,
*StemAllocation.array_attrs,
*Biomasses.array_attrs,
]
),
"community_canopy_attributes": set(
[
"canopy_layer_index",
"heights",
"cell_id",
"time",
*CommunityCanopyData.array_attrs,
]
),
"stem_canopy_attributes": set(
[
"canopy_layer_index",
"cohort_id",
"cell_id",
"time",
*CohortCanopyData.array_attrs,
]
),
}
"""Class variable of the available attributes that can be exported for each export
option."""
def __init__(
self,
output_directory: Path,
required_data: set[str] = set(),
cohort_attributes: set[str] = set(),
community_canopy_attributes: set[str] = set(),
stem_canopy_attributes: set[str] = set(),
float_format: str = "%0.5f",
) -> None:
# Store the argument values
self.output_directory: Path = output_directory
"""The directory in which to save plant community data."""
self.required_data: set[str] = required_data
"""The set of plant community data types to be exported."""
self.cohort_attributes: set[str] = cohort_attributes
"""A subset of cohort attribute names to export."""
self.community_canopy_attributes: set[str] = community_canopy_attributes
"""A subset of community canopy attribute names to export."""
self.stem_canopy_attributes: set[str] = stem_canopy_attributes
"""A subset of community canopy attribute names to export."""
self.float_format = float_format
"""The float format for data export."""
# Type and set internal attributes
self._output_mode: str = "w"
"""Switches the exporter between write and append mode."""
self._write_header: bool = True
"""Stops headers being duplicated in append mode."""
self._active: bool = True
"""Has any data export has been requested."""
# Initialise private data output path attributes - if set in required data,
# these are updated to provide a checked path for requested data
self._cohort_path: Path | None = None
self._community_canopy_path: Path | None = None
self._stem_canopy_path: Path | None = None
# Validate the required data argument
unknown_options = required_data.difference(self._outputs.keys())
if unknown_options:
msg = (
f"The required_data setting contains unknown data "
f"output options: {', '.join(unknown_options)}"
)
LOGGER.error(msg)
raise ConfigurationError(msg)
# If no output files are required then set the exporter in the inactive state
# and return the instance.
if not self.required_data:
self._active = False
LOGGER.info("Plant community data exporter not active.")
return
self._check_and_set_paths()
self._check_attribute_subsets()
LOGGER.info("Plant community data exporter active.")
def _check_and_set_paths(self) -> None:
"""Check and set the output paths to be used by the exporter.
This method assumes that the output directory has already been checked. It sets
the internal path attributes for each output data type as either None (to signal
it should not be written) or to a validated output path.
"""
# Otherwise check no data will be overwritten and export.
if not (self.output_directory.exists() and self.output_directory.is_dir()):
msg = (
f"The plant community data output directory does not exist or is not "
f"a directory: {self.output_directory}"
)
LOGGER.error(msg)
raise ConfigurationError(msg)
for out_option, (fname, attr) in self._outputs.items():
# Leave the path attribute at initial None value
if out_option not in self.required_data:
continue
# Otherwise check no data will be overwritten and export.
data_path = self.output_directory / fname
if data_path.exists():
msg = f"An output file for {out_option} data already exists: {fname}"
LOGGER.error(msg)
raise ConfigurationError(msg)
# Set the path attribute to the output path.
setattr(self, attr, data_path)
def _check_attribute_subsets(self) -> None:
"""Check attribute subsets contain available fields."""
for subset_name, available in self.available_attributes.items():
subset = getattr(self, subset_name)
# If subset is provided, check the values are all valid
if not subset:
continue
not_found = subset.difference(available)
if not_found:
msg = (
f"The {subset_name} exporter configuration contains "
f"unknown attributes: {', '.join(not_found)}"
)
LOGGER.error(msg)
raise ConfigurationError(msg)
[docs]
@classmethod
def from_config(
cls, output_directory: Path, config: PlantsExportConfig
) -> CommunityDataExporter:
"""Factory class to create a CommunityDataExporter from configuration data.
See the documentation of
:class:`~virtual_ecosystem.models.plants.model_config.PlantsExportConfig`
for details of the configuration settings for this method.
Args:
output_directory: The path to the output directory for the files
config: An instance of ``PlantsExportConfig``
"""
# Try and build the arguments as a dictionary from the config, substituting
# explicit None values for empty strings
try:
# Get arguments and convert inputs - reduce Literals to plain strings.
required_data = set([str(x) for x in config.required_data])
cohort_attributes = set(config.cohort_attributes)
community_canopy_attributes = set(config.community_canopy_attributes)
stem_canopy_attributes = set(config.stem_canopy_attributes)
except KeyError as excep:
LOGGER.error(excep)
raise
# Return the instance
return cls(
output_directory=output_directory,
required_data=required_data,
cohort_attributes=cohort_attributes,
community_canopy_attributes=community_canopy_attributes,
stem_canopy_attributes=stem_canopy_attributes,
)
[docs]
def dump(
self,
communities: PlantCommunities,
biomasses: dict[int, Biomasses] | None,
canopies: dict[int, Canopy],
stem_allocations: dict[int, StemAllocation],
time: np.datetime64,
time_index: int,
) -> None:
"""Export plant community data to file.
The method accepts the main community components of the PlantsModel as arguments
and compiles and writes the output data requested in the instance setup to file.
Args:
communities: A PlantCommunities instance.
biomasses: A dictionary of biomass data keyed by cell id.
canopies: A dictionary of Canopy instances, keyed by cell id.
stem_allocations: A dictionary of StemAllocations, also keyed by cell id
time: A datetime to be used as a timestamp in the output files.
time_index: The index of the datatime within the model updates.
"""
if not self._active:
return
# Run the dump methods for each output option.
self._dump_cohort_data(
communities=communities,
biomasses=biomasses,
canopies=canopies,
stem_allocations=stem_allocations,
time=time,
time_index=time_index,
)
self._dump_community_canopy_data(
canopies=canopies,
time=time,
time_index=time_index,
)
self._dump_stem_canopy_data(
communities=communities,
canopies=canopies,
time=time,
time_index=time_index,
)
# Update the output mode and header: all subsequent dump calls use append
self._output_mode = "a"
self._write_header = False
def _dump_cohort_data(
self,
communities: PlantCommunities,
biomasses: dict[int, Biomasses] | None,
canopies: dict[int, Canopy],
stem_allocations: dict[int, StemAllocation],
time: np.datetime64,
time_index: int,
) -> None:
"""Dump plant cohort data to file.
Args:
communities: A PlantCommunities instance.
biomasses: A dictionary of biomass data keyed by cell id.
canopies: A dictionary of Canopy instances, keyed by cell id.
stem_allocations: A dictionary of StemAllocations, also keyed by cell id
time: A datetime to be used as a timestamp in the output files
time_index: The index of the datatime within the model updates.
"""
# If the data has not been requested - so the path is None - then exit
if self._cohort_path is None:
return
# Collect cell dataframes into an list for use with row-wise pd.concat()
cohort_data = []
for cell_id, community in communities.items():
# The stem allocations are only defined after update so at setup, the
# stem allocations are defined as an empty dictionary. In this case,
# provide an empty data frame of np.nan values for each cohort.
if stem_allocations:
allocation = stem_allocations[cell_id].to_pandas()
else:
allocation = pd.DataFrame(
{
key: np.full(community.n_cohorts, np.nan)
for key in StemAllocation.array_attrs
}
)
# Concatenate the cohort data, stem allometry and stem allocation by
# column
if biomasses is None:
biomass_data = pd.DataFrame(index=np.arange(community.n_cohorts))
else:
biomass_data = self._export_biomass_data(biomasses[cell_id])
community_data = pd.concat(
[
community.cohorts.to_pandas(),
community.stem_allometry.to_pandas(),
allocation,
biomass_data,
],
axis=1,
)
# Add the cell id and append the cohorts in this community to the list
community_data["cell_id"] = cell_id
cohort_data.append(community_data)
# Concatenate the cells by row and add time
cohort_data_compiled = pd.concat(cohort_data)
cohort_data_compiled["time"] = time
cohort_data_compiled["time_index"] = time_index
# Reduce to requested attributes
if self.cohort_attributes:
cohort_data_compiled = cohort_data_compiled[list(self.cohort_attributes)]
# Export cohort data - this switches from write mode with headers to append
# mode without headers after the first call to dump.
cohort_data_compiled.to_csv(
self._cohort_path,
mode=self._output_mode,
header=self._write_header,
index=False,
float_format=self.float_format,
)
LOGGER.info(f"Plant model cohort data dumped at time: {time}")
@staticmethod
def _export_biomass_data(biomass: Biomasses) -> pd.DataFrame:
"""Extract per-cohort biomass tissue and element data as a dataframe."""
columns: dict[str, np.ndarray] = {}
for tissue in biomass.tissues:
tissue_name = tissue.tissue_name.lower()
columns[f"biomass_{tissue_name}_carbon_mass"] = tissue.carbon_mass
for elem_name, element in tissue.element_masses.items():
elem = elem_name.lower()
columns[f"biomass_{tissue_name}_{elem}_actual_element_mass"] = (
element.actual_element_mass
)
return pd.DataFrame(columns)
def _dump_community_canopy_data(
self,
canopies: dict[int, Canopy],
time: np.datetime64,
time_index: int,
):
"""Dump community canopy data to file.
Args:
canopies: A dictionary of Canopy instances, keyed by cell id.
time: A datetime to be used as a timestamp in the output files
time_index: The index of the datatime within the model updates.
"""
# If the data has not been requested - so the path is None - then exit
if self._community_canopy_path is None:
return
community_canopy_data = []
for cell_id, canopy in canopies.items():
data = canopy.community_data.to_pandas()
data["canopy_layer_index"] = data.index
data["heights"] = canopy.heights
data["cell_id"] = cell_id
data["time"] = time
data["time_index"] = time_index
community_canopy_data.append(data)
# Concatenate the cells into a single data frame
community_canopy_data_compiled = pd.concat(community_canopy_data)
# Reduce to requested attributes
if self.community_canopy_attributes:
community_canopy_data_compiled = community_canopy_data_compiled[
list(self.community_canopy_attributes)
]
# Export community canopy data
community_canopy_data_compiled.to_csv(
self._community_canopy_path,
mode=self._output_mode,
header=self._write_header,
index=False,
float_format=self.float_format,
)
LOGGER.info(f"Plant model community canopy data dumped at time: {time}")
def _dump_stem_canopy_data(
self,
communities: PlantCommunities,
canopies: dict[int, Canopy],
time: np.datetime64,
time_index: int,
) -> None:
"""Dump stem canopy data to file.
Args:
communities: A PlantCommunities instance.
canopies: A dictionary of Canopy instances, keyed by cell id.
time: A datetime to be used as a timestamp in the output files
time_index: The index of the datatime within the model updates.
"""
# If the data has not been requested - so the path is None - then exit
if self._stem_canopy_path is None:
return
stem_canopy_data = []
for (cell_id, canopy), community in zip(canopies.items(), communities.values()):
data = canopy.cohort_data.to_pandas()
data["canopy_layer_index"] = data.index
data["cell_id"] = cell_id
data["cohort_id"] = np.repeat(
community.cohorts.cohort_id, len(canopy.heights)
)
data["time"] = time
data["time_index"] = time_index
stem_canopy_data.append(data)
# Concatenate the cells into a single data frame
stem_canopy_data_compiled = pd.concat(stem_canopy_data)
# Reduce to requested attributes
if self.stem_canopy_attributes:
stem_canopy_data_compiled = stem_canopy_data_compiled[
list(self.stem_canopy_attributes)
]
# Export stem canopy data
stem_canopy_data_compiled.to_csv(
self._stem_canopy_path,
mode=self._output_mode,
header=self._write_header,
index=False,
float_format=self.float_format,
)
LOGGER.info(f"Plant model stem canopy data dumped at time: {time}")