Source code for virtual_ecosystem.models.animal.exporter

"""The exporter module provides the
:class:`~virtual_ecosystem.models.animal.model_config.AnimalExportConfig`,
which is used to control the output of animal cohort data at each time step. An instance
of the class is required by the
:class:`~virtual_ecosystem.models.animal.animal_cohorts.AnimalCohort`, which calls the
``dump()`` method within the setup and update steps to export data continuously during
the model run.
"""  # noqa: D205

from __future__ import annotations

from collections.abc import Iterable
from pathlib import Path
from typing import ClassVar

import numpy as np
import pandas as pd

from virtual_ecosystem.core.exceptions import ConfigurationError
from virtual_ecosystem.core.logger import LOGGER
from virtual_ecosystem.models.animal.animal_cohorts import AnimalCohort
from virtual_ecosystem.models.animal.array_resources import ResourcePool
from virtual_ecosystem.models.animal.decay import (
    CarcassPool,
    ExcrementPool,
    FungalFruitPool,
    SoilPool,
)
from virtual_ecosystem.models.animal.model_config import (
    AnimalExportConfig,
    ResourcePoolExportConfig,
)


[docs] class AnimalCohortDataExporter: """Exporter for detailed animal cohort data. This class writes one CSV file containing a row for every cohort at every time step. The file is opened in write mode on the first call to ``dump`` (including the header) and subsequently appended to. The exporter mirrors the design of :class:`virtual_ecosystem.models.plants.exporter.CommunityDataExporter` but is simplified to a single ``cohorts`` output stream. Args: output_directory: Directory where the CSV file will be created. cohort_attributes: Optional subset of cohort attributes to export. If an empty set is provided, all available attributes are written. float_format: Float format string used when writing numeric data. """ _outputs: ClassVar[dict[str, tuple[str, str]]] = { "cohorts": ("animal_cohort_data.csv", "_cohort_path"), "trophic": ("animal_trophic_interactions.csv", "_trophic_path"), } """Mapping from output key to (filename, path-attribute-name).""" required_attributes: ClassVar[tuple[str, ...]] = ( "cohort_id", "time", "time_index", ) """A set of output fields that are always included in cohort export.""" available_attributes: ClassVar[set[str]] = { "functional_group", "development_type", "diet_type", "reproductive_environment", "age", "individuals", "is_alive", "is_mature", "time_to_maturity", "time_since_maturity", "location_status", "centroid_key", "territory_size", "territory", "occupancy_proportion", "largest_mass_achieved", "mass_carbon", "mass_nitrogen", "mass_phosphorus", "reproductive_mass_carbon", "reproductive_mass_nitrogen", "reproductive_mass_phosphorus", } """The set of valid attribute names that can be selected for cohort export.""" def __init__( self, output_directory: Path, cohort_attributes: set[str] | None = None, float_format: str = "%0.5f", ) -> None: # Public configuration self.output_directory: Path = output_directory """The directory in which to save animal cohort data.""" self.cohort_attributes: set[str] = cohort_attributes or set() """The set of animal cohort attributes to be exported.""" self.float_format: str = float_format """The float format for data export.""" # Internal state self._cohort_output_mode: str = "w" """Switches the cohort exporter between write and append mode.""" self._trophic_output_mode: str = "w" """Switches the trophic exporter between write and append mode.""" self._write_cohort_header: bool = True """Stops cohort headers being duplicated in append mode.""" self._write_trophic_header: bool = True """Stops trophic headers being duplicated in append mode.""" self._active: bool = True """Has any data export has been requested.""" self._cohort_path: Path | None = None """Sets the output path for the cohort csv.""" self._trophic_path: Path | None = None """Sets the output path for the trophic csv.""" # Remove any required headers from the cohort attributes so that the attribute # subset validation only checks the optional available values self.cohort_attributes -= set(self.required_attributes) self._check_and_set_paths() self._check_attribute_subsets()
[docs] @classmethod def from_config( cls, output_directory: Path, config: AnimalExportConfig, ) -> AnimalCohortDataExporter: """Create an exporter from an AnimalExportConfig instance. Args: output_directory: Directory where the CSV file will be created. config: Configuration section controlling animal cohort export. Returns: Initialised AnimalCohortDataExporter instance. """ if not config.enabled: LOGGER.info("Animal cohort data exporter not active.") exporter = cls.__new__(cls) # Public configuration exporter.output_directory = output_directory exporter.cohort_attributes = set() exporter.float_format = config.float_format # Internal state exporter._cohort_output_mode = "w" exporter._trophic_output_mode = "w" exporter._write_cohort_header = True exporter._write_trophic_header = True exporter._active = False exporter._cohort_path = None exporter._trophic_path = None return exporter cohort_attributes = set(config.cohort_attributes) return cls( output_directory=output_directory, cohort_attributes=cohort_attributes, float_format=config.float_format, )
def _check_and_set_paths(self) -> None: """Check and set the output paths to be used by the exporter. Raises: ConfigurationError: If the directory does not exist or is not a directory, or if any output file already exists. """ if not (self.output_directory.exists() and self.output_directory.is_dir()): msg = ( "The animal cohort data output directory does not exist or is not " f"a directory: {self.output_directory}" ) LOGGER.error(msg) raise ConfigurationError(msg) for output_key, (fname, attr_name) in self._outputs.items(): data_path = self.output_directory / fname if data_path.exists(): msg = ( "An output file for animal cohort export already exists: " f"{output_key} -> {fname}" ) LOGGER.error(msg) raise ConfigurationError(msg) setattr(self, attr_name, data_path) def _check_attribute_subsets(self) -> None: """Validate that requested attribute subset is available. Raises: ConfigurationError: If any requested attribute is unknown. """ if not self.cohort_attributes: return not_found = self.cohort_attributes.difference(self.available_attributes) if not_found: msg = ( "The cohort exporter configuration contains unknown attributes: " f"{', '.join(sorted(not_found))}" ) LOGGER.error(msg) raise ConfigurationError(msg) def _dump_cohorts( self, cohorts: Iterable[AnimalCohort], time: np.datetime64, time_index: int, ) -> None: """Write animal cohort data to CSV. Args: cohorts: Iterable of animal cohort objects. time: Timestamp to associate with this snapshot. time_index: The index of the datatime within the model updates. """ if not self._active: return if self._cohort_path is None: LOGGER.debug("Animal cohort exporter called with no output path.") return rows: list[dict[str, object]] = [] for cohort in cohorts: rows.append( self._build_cohort_row(cohort=cohort, time=time, time_index=time_index) ) if not rows: LOGGER.info("Animal cohort exporter called with no cohorts present.") return df = pd.DataFrame(rows) if self.cohort_attributes: df = df[list(self.required_attributes) + sorted(self.cohort_attributes)] df.to_csv( self._cohort_path, mode=self._cohort_output_mode, header=self._write_cohort_header, index=False, float_format=self.float_format, ) LOGGER.info("Animal model cohort data dumped at time: %s", time) # Flip cohort state because we actually wrote a file. self._cohort_output_mode = "a" self._write_cohort_header = False def _dump_trophic( self, cohorts: Iterable[AnimalCohort], territory_by_id: dict[str, list[int]], time: np.datetime64, time_index: int, ) -> None: """Write trophic interaction data to CSV. Args: cohorts: List of animal cohort objects. territory_by_id: Dictionary of str(uuid),territory pairs for lookup. time: Timestamp to associate with this snapshot. time_index: The index of the datatime within the model updates. """ if not self._active: return if self._trophic_path is None: LOGGER.debug("Trophic exporter called with no output path.") return rows: list[dict[str, object]] = [] for cohort in cohorts: rows.extend( self._build_trophic_rows( cohort=cohort, time=time, territory_by_id=territory_by_id, time_index=time_index, ) ) if not rows: LOGGER.info("Trophic exporter called with no interactions present.") return df = pd.DataFrame(rows) df.to_csv( self._trophic_path, mode=self._trophic_output_mode, header=self._write_trophic_header, index=False, float_format=self.float_format, ) # Flip trophic state because we actually wrote a file. self._trophic_output_mode = "a" self._write_trophic_header = False
[docs] def dump( self, cohorts: Iterable[AnimalCohort], time: np.datetime64, time_index: int ) -> None: """Write animal cohort and trophic interaction data to CSV. Args: cohorts: List of animal cohort objects. time: Timestamp to associate with this snapshot. time_index: The index of the datatime within the model updates. """ if not self._active: return if self._cohort_path is None and self._trophic_path is None: LOGGER.debug("Animal exporter called with no output path.") return cohort_list = list(cohorts) territory_by_id = {str(cohort.id): cohort.territory for cohort in cohort_list} self._dump_cohorts(cohorts=cohort_list, time=time, time_index=time_index) self._dump_trophic( cohorts=cohort_list, territory_by_id=territory_by_id, time=time, time_index=time_index, )
def _build_cohort_row( self, cohort: AnimalCohort, time: np.datetime64, time_index: int, ) -> dict[str, object]: """Build a single output row for a cohort. Args: cohort: Cohort to serialise. time: Timestamp for this snapshot. time_index: The index of the datatime within the model updates. Returns: Dictionary mapping column name to value. """ fg = cohort.functional_group mass_cnp = cohort.mass_cnp repro_cnp = cohort.reproductive_mass_cnp return { "time": time, "time_index": time_index, "cohort_id": str(cohort.id), "functional_group": fg.name, "development_type": str(fg.development_type), "diet_type": str(fg.diet), "reproductive_environment": str(fg.reproductive_environment), "age": cohort.age, "individuals": cohort.individuals, "is_alive": cohort.is_alive, "is_mature": cohort.is_mature, "time_to_maturity": cohort.time_to_maturity, "time_since_maturity": cohort.time_since_maturity, "location_status": cohort.location_status, "centroid_key": cohort.centroid_key, "territory_size": cohort.territory_size, "territory": cohort.territory, "occupancy_proportion": cohort.occupancy_proportion, "largest_mass_achieved": cohort.largest_mass_achieved, "mass_carbon": mass_cnp.C, "mass_nitrogen": mass_cnp.N, "mass_phosphorus": mass_cnp.P, "reproductive_mass_carbon": repro_cnp.C, "reproductive_mass_nitrogen": repro_cnp.N, "reproductive_mass_phosphorus": repro_cnp.P, } def _build_trophic_rows( self, cohort: AnimalCohort, territory_by_id: dict[str, list[int]], time: np.datetime64, time_index: int, ) -> list[dict[str, object]]: """Build trophic interaction rows for a single cohort. Args: cohort: Consumer cohort containing a trophic_record for the timestep. territory_by_id: Dictionary of str(uuid),territory pairs for lookup. time: Timestamp for this snapshot. time_index: The index of the datatime within the model updates. Returns: List of dictionaries, one per resource consumed, with C/N/P removed. """ rows: list[dict[str, object]] = [] for (resource_kind, resource_id), cnp in cohort.trophic_record.items(): prey_territory: list[int] | None = None resource_cell_id: int | None = None if resource_kind == "cohort": # Prey is another animal cohort prey_territory = territory_by_id.get(resource_id) else: # Resource pools are keyed by cell id resource_cell_id = int(resource_id) rows.append( { "time": time, "time_index": time_index, "consumer_cohort_id": str(cohort.id), "consumer_territory": cohort.territory, "resource_kind": resource_kind, "resource_id": resource_id, "resource_cell_id": resource_cell_id, "prey_territory": prey_territory, "C": cnp["C"], "N": cnp["N"], "P": cnp["P"], } ) return rows
[docs] class ResourcePoolDataExporter: """Exporter for resource pool state data. Writes one CSV file containing a row for every resource pool sub-pool at every time step. The file is opened in write mode on the first call to ``dump`` (including the header) and subsequently appended to. The exporter covers all animal-model resource pools: carcass, excrement, fungal fruiting body, soil, and plant/litter array pools. Each row identifies the pool by ``pool_type``, ``pool_name``, ``sub_pool``, ``pft``, and ``cell_id``, and records the carbon, nitrogen, and phosphorus masses at the time of the snapshot. For plant and litter array pools the snapshot reflects the pre-foraging available masses, because ``ResourcePool.elemental_masses`` is populated by ``set_resources`` at the start of each update step and is not modified in-place during foraging. Args: output_directory: Directory where the CSV file will be created. float_format: Float format string used when writing numeric data. """ _outputs: ClassVar[dict[str, tuple[str, str]]] = { "resource_pools": ("resource_pool_data.csv", "_pool_path"), } """Mapping from output key to (filename, path-attribute-name).""" def __init__( self, output_directory: Path, float_format: str = "%0.5f", ) -> None: self.output_directory: Path = output_directory """The directory in which to save resource pool data.""" self.float_format: str = float_format """The float format for data export.""" self._output_mode: str = "w" """Switches the exporter between write and append mode.""" self._write_header: bool = True """Stops headers being duplicated in append mode.""" self._active: bool = True """Whether any data export has been requested.""" self._pool_path: Path | None = None """Sets the output path for the resource pool csv.""" self._check_and_set_paths()
[docs] @classmethod def from_config( cls, output_directory: Path, config: ResourcePoolExportConfig, ) -> ResourcePoolDataExporter: """Create an exporter from a ResourcePoolExportConfig instance. If the config has ``enabled=False``, returns an inactive exporter that silently no-ops on all ``dump`` calls. Args: output_directory: Directory where the CSV file will be created. config: Configuration section controlling resource pool export. Returns: Initialised ResourcePoolDataExporter instance. """ if not config.enabled: LOGGER.info("Resource pool data exporter not active.") exporter = cls.__new__(cls) exporter.output_directory = output_directory exporter.float_format = config.float_format exporter._output_mode = "w" exporter._write_header = True exporter._active = False exporter._pool_path = None return exporter return cls( output_directory=output_directory, float_format=config.float_format, )
def _check_and_set_paths(self) -> None: """Check and set the output paths to be used by the exporter. Raises: ConfigurationError: If the directory does not exist or is not a directory, or if any output file already exists. """ if not (self.output_directory.exists() and self.output_directory.is_dir()): msg = ( "The resource pool data output directory does not exist or is not " f"a directory: {self.output_directory}" ) LOGGER.error(msg) raise ConfigurationError(msg) for output_key, (fname, attr_name) in self._outputs.items(): data_path = self.output_directory / fname if data_path.exists(): msg = ( "An output file for resource pool export already exists: " f"{output_key} -> {fname}" ) LOGGER.error(msg) raise ConfigurationError(msg) setattr(self, attr_name, data_path)
[docs] def dump( self, carcass_pools: dict[int, list[CarcassPool]], excrement_pools: dict[int, list[ExcrementPool]], fungal_fruiting_pools: dict[int, FungalFruitPool], soil_pools: dict[int, dict[str, SoilPool]], resource_pools: list[ResourcePool], time: np.datetime64, time_index: int, ) -> None: """Write resource pool state data to CSV. This method is a no-op if the exporter is inactive. Args: carcass_pools: Carcass pools keyed by cell id, each containing one or more CarcassPool instances. excrement_pools: Excrement pools keyed by cell id, each containing one or more ExcrementPool instances. fungal_fruiting_pools: Fungal fruiting body pools keyed by cell id. soil_pools: Soil pools keyed by cell id and then by pool-type string (e.g. ``"bacteria"``, ``"saprotrophic_fungi"``). resource_pools: Flat list of plant and litter ResourcePool instances. Each pool's ``elemental_masses`` array holds pre-foraging available masses set by the most recent ``set_resources`` call. time: Timestamp to associate with this snapshot. time_index: The index of the datetime within the model updates. """ if not self._active: return if self._pool_path is None: LOGGER.debug("Resource pool exporter called with no output path.") return rows: list[dict[str, object]] = [] rows.extend(self._build_carcass_rows(carcass_pools, time, time_index)) rows.extend(self._build_excrement_rows(excrement_pools, time, time_index)) rows.extend(self._build_fungal_rows(fungal_fruiting_pools, time, time_index)) rows.extend(self._build_soil_rows(soil_pools, time, time_index)) rows.extend(self._build_resource_pool_rows(resource_pools, time, time_index)) if not rows: LOGGER.info("Resource pool exporter called with no pool data present.") return pd.DataFrame(rows).to_csv( self._pool_path, mode=self._output_mode, header=self._write_header, index=False, float_format=self.float_format, ) LOGGER.info("Resource pool data dumped at time: %s", time) self._output_mode = "a" self._write_header = False
def _build_carcass_rows( self, carcass_pools: dict[int, list[CarcassPool]], time: np.datetime64, time_index: int, ) -> list[dict[str, object]]: """Build output rows for all carcass pools. Emits two rows per pool instance: one for the scavengeable fraction and one for the decomposed fraction. Args: carcass_pools: Carcass pools keyed by cell id. time: Timestamp for this snapshot. time_index: The index of the datetime within the model updates. Returns: List of row dictionaries, two per CarcassPool instance. """ rows = [] for cell_id, pools in carcass_pools.items(): for pool in pools: for sub_pool, cnp in ( ("scavengeable", pool.scavengeable_cnp), ("decomposed", pool.decomposed_cnp), ): rows.append( { "time": time, "time_index": time_index, "pool_type": "carcass", "pool_name": "", "sub_pool": sub_pool, "pft": "", "cell_id": cell_id, "C": cnp.C, "N": cnp.N, "P": cnp.P, } ) return rows def _build_excrement_rows( self, excrement_pools: dict[int, list[ExcrementPool]], time: np.datetime64, time_index: int, ) -> list[dict[str, object]]: """Build output rows for all excrement pools. Emits two rows per pool instance: one for the scavengeable fraction and one for the decomposed fraction. Args: excrement_pools: Excrement pools keyed by cell id. time: Timestamp for this snapshot. time_index: The index of the datetime within the model updates. Returns: List of row dictionaries, two per ExcrementPool instance. """ rows = [] for cell_id, pools in excrement_pools.items(): for pool in pools: for sub_pool, cnp in ( ("scavengeable", pool.scavengeable_cnp), ("decomposed", pool.decomposed_cnp), ): rows.append( { "time": time, "time_index": time_index, "pool_type": "excrement", "pool_name": "", "sub_pool": sub_pool, "pft": "", "cell_id": cell_id, "C": cnp.C, "N": cnp.N, "P": cnp.P, } ) return rows def _build_fungal_rows( self, fungal_fruiting_pools: dict[int, FungalFruitPool], time: np.datetime64, time_index: int, ) -> list[dict[str, object]]: """Build output rows for all fungal fruiting body pools. Emits one row per pool instance. Args: fungal_fruiting_pools: Fungal fruiting body pools keyed by cell id. time: Timestamp for this snapshot. time_index: The index of the datetime within the model updates. Returns: List of row dictionaries, one per FungalFruitPool instance. """ rows = [] for cell_id, pool in fungal_fruiting_pools.items(): rows.append( { "time": time, "time_index": time_index, "pool_type": "fungal_fruiting", "pool_name": "", "sub_pool": "", "pft": "", "cell_id": cell_id, "C": pool.mass_cnp.C, "N": pool.mass_cnp.N, "P": pool.mass_cnp.P, } ) return rows def _build_soil_rows( self, soil_pools: dict[int, dict[str, SoilPool]], time: np.datetime64, time_index: int, ) -> list[dict[str, object]]: """Build output rows for all soil pools. Emits one row per (cell, pool-type) combination. Args: soil_pools: Soil pools keyed by cell id and pool-type string (e.g. ``"bacteria"``, ``"saprotrophic_fungi"``). time: Timestamp for this snapshot. time_index: The index of the datetime within the model updates. Returns: List of row dictionaries, one per SoilPool instance. """ rows = [] for cell_id, pools_by_type in soil_pools.items(): for pool_name, pool in pools_by_type.items(): rows.append( { "time": time, "time_index": time_index, "pool_type": "soil", "pool_name": pool_name, "sub_pool": "", "pft": "", "cell_id": cell_id, "C": pool.mass_cnp.C, "N": pool.mass_cnp.N, "P": pool.mass_cnp.P, } ) return rows def _build_resource_pool_rows( self, resource_pools: list[ResourcePool], time: np.datetime64, time_index: int, ) -> list[dict[str, object]]: """Build output rows for all plant and litter array resource pools. Emits one row per (pool, cell) combination. The C, N, and P masses are taken from ``ResourcePool.elemental_masses``, which holds pre-foraging available masses populated by the most recent ``set_resources`` call. Args: resource_pools: Flat list of ResourcePool instances. time: Timestamp for this snapshot. time_index: The index of the datetime within the model updates. Returns: List of row dictionaries, one per (ResourcePool, cell) pair. """ rows = [] for pool in resource_pools: pool_name = pool.resource.pool_array pft = pool.pft or "" for cell_id, (C, N, P) in enumerate(pool.elemental_masses): rows.append( { "time": time, "time_index": time_index, "pool_type": "resource_array", "pool_name": pool_name, "sub_pool": "", "pft": pft, "cell_id": cell_id, "C": C, "N": N, "P": P, } ) return rows