Source code for virtual_ecosystem.core.data

"""The :mod:`~virtual_ecosystem.core.data` module handles the population and storage of
data sources used to run Virtual Ecosystem simulations.

The Data class
==============

The core :class:`~virtual_ecosystem.core.data.Data` class is used to store data for the
variables used in a simulation. It can be used both for data from external sources - for
example, data used to set the initial environment or time series of inputs - and for
internal variables used in the simulation. The class behaves like a dictionary - so data
can be retrieved and set using ``data_object['varname']`` - but also provide validation
for data being added to the object.

All data added to the class is stored in a :class:`~xarray.Dataset` object, and data
extracted from the object will be a :class:`~xarray.DataArray`. The ``Dataset`` can also
be accessed directly using the :attr:`~virtual_ecosystem.core.data.Data.data` attribute
of the class instance to use any of the :class:`~xarray.Dataset` class methods.

When data is added to a :class:`~virtual_ecosystem.core.data.Data` instance, it is
automatically validated against the configuration of a simulation before being added to
the :attr:`~virtual_ecosystem.core.data.Data.data` attribute. The validation process
also stores information that allows models to can confirm that a given variable has been
successfully validated.

The core of the :class:`~virtual_ecosystem.core.data.Data` class is the
:meth:`~virtual_ecosystem.core.data.Data.__setitem__` method. This method provides the
following functionality:

* It allows a ``DataArray`` to be added to a :class:`~virtual_ecosystem.core.data.Data`
  instance using the ``data['varname'] = data_array`` syntax.
* It applies the validation step using the
  :func:`~virtual_ecosystem.core.axes.validate_dataarray` function. See the
  :mod:`~virtual_ecosystem.core.axes` module for the details of the validation process,
  including the :class:`~virtual_ecosystem.core.axes.AxisValidator` class and the
  concept of core axes.
* It inserts the data into the :class:`~xarray.Dataset` instance stored in the
  :attr:`~virtual_ecosystem.core.data.Data.data` attribute.
* Lastly, it records the data validation details in the
  :attr:`~virtual_ecosystem.core.data.Data.variable_validation` attribute.

The :class:`~virtual_ecosystem.core.data.Data` class also provides three shorthand
methods to get information and data from an instance.

* The :meth:`~virtual_ecosystem.core.data.Data.__contains__` method tests if a named
  variable is included in the internal :class:`~xarray.Dataset` instance.

    .. code-block:: python

        # Equivalent code 'varname' in data 'varname' in data.data

* The :meth:`~virtual_ecosystem.core.data.Data.__getitem__` method is used to retrieve
  a named variable from the internal :class:`~xarray.Dataset` instance.

    .. code-block:: python

        # Equivalent code data['varname'] data.data['varname']

* The :meth:`~virtual_ecosystem.core.data.Data.on_core_axis` method queries the
  :attr:`~virtual_ecosystem.core.data.Data.variable_validation` attribute to confirm
  that a named variable has been validated on a named axis.

    .. code-block:: python

        # Test that the temperature variable has been validated on the spatial axis
        data.on_core_axis('temperature', 'spatial')

Adding data from a file
-----------------------

The general solution for programmatically adding data from a file is to:

* manually open a data file using an appropriate reader packages for the format,
* coerce data from named variables into properly structured :class:`~xarray.DataArray`
  objects, and then
* use the :meth:`~virtual_ecosystem.core.data.Data.__setitem__` method to validate and
  add it to a :class:`~virtual_ecosystem.core.data.Data` instance.

The  :func:`~virtual_ecosystem.core.readers.load_to_dataarray` implements data loading
to a DataArray for some known file formats, using file reader functions described in the
:mod:`~virtual_ecosystem.core.readers` module. See the details of that module for
supported formats and for extending the system to additional file formats.

.. code-block:: python

    # Load temperature data from a supported file
    from virtual_ecosystem.core.readers import load_to_dataarray
    results = load_to_dataarray(
        '/path/to/supported/format.nc', var_names=['temperature']
    )
    data['temperature'] = results['temperature']

Using a data configuration
--------------------------

A :class:`~virtual_ecosystem.core.data.Data` instance can also be populated using the
:meth:`~virtual_ecosystem.core.data.Data.load_data_config` method. This is expecting to
take a properly validated configuration object, typically created from TOML files
(see :class:`~virtual_ecosystem.core.config_builder.ConfigurationLoader`). The expected
structure of the data configuration section within those TOML files is as follows:

.. code-block:: toml

    [[core.data.variable]]
    file_path="/path/to/file.nc"
    var_name="precip"
    [[core.data.variable]]
    file_path="/path/to/file.nc"
    var_name="temperature"
    [[core.data.variable]]
    file_path="/path/to/a/different/file.nc"
    var_name="elev"

You can include ```core.data.variable``` tags in different files. This can be useful to
group model-specific data with other model configuration options, and allow
configuration files to be swapped in a more modular fashion. However, the data
configurations across all files **must not** contain repeated data variable names.

.. code-block:: python

    # Load configured datasets
    data.load_data_config(config)

"""  # noqa: D205

from itertools import groupby
from pathlib import Path
from typing import Any

import dask
import numpy as np
from xarray import DataArray, Dataset, open_mfdataset

from virtual_ecosystem.core.axes import AXIS_VALIDATORS, validate_dataarray
from virtual_ecosystem.core.core_components import ModelTiming
from virtual_ecosystem.core.exceptions import ConfigurationError
from virtual_ecosystem.core.grid import Grid
from virtual_ecosystem.core.logger import LOGGER
from virtual_ecosystem.core.model_config import CoreConfiguration
from virtual_ecosystem.core.readers import load_to_dataarray
from virtual_ecosystem.core.utils import check_outfile

# There are ongoing xarray issues with NetCDF not being thread safe and this causes
# segfaults on different architectures in testing using `xarray.open_mfdataset`
# See:
# - https://github.com/pydata/xarray/issues/7079
# - https://github.com/pydata/xarray/issues/3961
#
# Following advice on both those issues, we currently explicitly stop dask from trying
# to use parallel file processing and use open_mfdataset(..., lock=False)
dask.config.set(scheduler="single-threaded")


# TODO: Model timing is currently used when writing the data to file to provide the
#       datestamps of the time_index dimension. This should probably be passed to
#       Data.__init__ so that it is available for all methods.


[docs] class Data: """The Virtual Ecosystem data object. This class holds data for a Virtual Ecosystem simulation. It functions like a dictionary but the class extends the dictionary methods to provide common methods for data validation etc and to hold key attributes, such as the underlying spatial grid. Args: grid: The Grid instance that will be used for simulation. Raises: TypeError: when grid is not a Grid object """ def __init__(self, grid: Grid) -> None: # Set up the instance properties if not isinstance(grid, Grid): to_raise = TypeError("Data must be initialised with a Grid object") LOGGER.critical(to_raise) raise to_raise # Local import to avoid circular import issue from virtual_ecosystem.core.variables import ( VariableMetadata, load_known_variables, ) self.known_variables: dict[str, VariableMetadata] = load_known_variables() """A dictionary of known variables.""" self.grid: Grid = grid """The configured Grid to be used in a simulation.""" self.data = Dataset() """The :class:`~xarray.Dataset` used to store data.""" self.variable_validation: dict[str, dict[str, str | None]] = {} """Records validation details for loaded variables. The validation details for each variable is stored in this dictionary using the variable name as a key. The validation details are a dictionary, keyed using core axis names, of the :class:`~virtual_ecosystem.core.axes.AxisValidator` subclass applied to that axis. If no validator was applied, the entry for that core axis will be ``None``. """
[docs] def __repr__(self) -> str: """Returns a representation of a Data instance.""" if self.data: return f"Data: {list(self.data.data_vars)}" return "Data: no variables loaded"
[docs] def __setitem__(self, key: str, value: DataArray) -> None: """Load a data array into a Data instance. This method takes an input {class}`~xarray.DataArray` object and then matches the dimension and coordinates signature of the array to find a loading routine given the grid used in the {class}`virtual_ecosystem.core.data.Data` instance. That routine is used to validate the DataArray and then add the DataArray to the {class}`~xarray.Dataset` object or replace the existing DataArray under that key. Note that the DataArray name is expected to match the standard internal variable names used in Virtual Ecosystem and this is enforced against the dictionary of known variables. The method also adds unit and description metadata to from the known variables database to attributes as they are written to the data object. Args: key: The name to store the data under value: The DataArray to be stored Raises: TypeError: when the value is not a DataArray. """ if not isinstance(value, DataArray): to_raise = TypeError( "Only DataArray objects can be added to Data instances" ) LOGGER.critical(to_raise) raise to_raise if key not in self.known_variables: msg = f"Attempt to add unknown variable to data: '{key}'" LOGGER.critical(msg) raise ValueError(msg) if key not in self.data.data_vars: LOGGER.info(f"Adding data array for '{key}'") else: LOGGER.info(f"Replacing data array for '{key}'") # Add variable_metadata from known variables database - these needs to be done # for both adding and replacing variables as the science models do not attempt # to persist array attributes during calculations. variable = self.known_variables[key] value.attrs.update({"unit": variable.unit, "description": variable.description}) # Validate and store the data array value, valid_dict = validate_dataarray(value=value, grid=self.grid) self.data[key] = value self.variable_validation[key] = valid_dict
[docs] def __getitem__(self, key: str) -> DataArray: """Get a given data variable from a Data instance. This method looks for the provided key in the data variables saved in the `data` attribute and returns the DataArray for that variable. Note that this is just a shortcut: ``data_instance['var']`` is the same as ``data_instance.data['var']``. Args: key: The name of the data variable to get Raises: KeyError: if the data variable is not present """ return self.data[key]
[docs] def __contains__(self, key: str) -> bool: """Check if a given data variable is present in a Data instance. This method provides the `var_name in data_instance` functionality for a Data instance. This is just a shortcut: ``var in data_instance`` is the same as ``var in data_instance.data``. Args: key: A data variable name """ return key in self.data
[docs] def on_core_axis(self, var_name: str, axis_name: str) -> bool: """Check core axis validation. This function checks if a given variable loaded into a Data instance has been validated on one of the core axes. Args: var_name: The name of a variable axis_name: The core axis name Returns: A boolean indicating if the variable was validated on the named axis. Raises: ValueError: Either an unknown variable or core axis name or that the variable validation data in the Data instance does not include the variable, which would be an internal programming error. """ if var_name not in self.data: raise ValueError(f"Unknown variable name: {var_name}") if var_name not in self.variable_validation: raise ValueError(f"Missing variable validation data: {var_name}") if axis_name not in AXIS_VALIDATORS: raise ValueError(f"Unknown core axis name: {axis_name}") if self.variable_validation[var_name][axis_name] is None: return False return True
[docs] def load_data_config(self, config: CoreConfiguration) -> None: """Setup the simulation data from a user configuration. This is a method is used to validate a provided user data configuration and populate the Data instance object from the provided data sources. The data_config dictionary can contain a 'variable' key containing an array of dictionaries providing the path to the file (``file_path``) and the name of the variable within the file (``var_name``). The function groups variables by their source file path, so that each file is only opened once to load the requested variables. Args: config: A validated Virtual Ecosystem model configuration object. """ LOGGER.info("Loading data from configuration") # Track errors in loading multiple files from a configuration data_config = config.data # The previous code here tested for "constant" and "generator" data types, but # since those are not yet implemented, this has been dropped # Handle variables if len(data_config.variable) == 0: LOGGER.warning("No data sources defined in the data configuration.") return clean_load = True # Check what name the data will be saved under but do then carry on to check # for other loading problems data_var_names = [var.var_name for var in data_config.variable] dupl_names = {str(md) for md in data_var_names if data_var_names.count(md) > 1} if dupl_names: LOGGER.error("Duplicate variable names in data configuration.") clean_load = False # Group variables by file variables = list(data_config.variable) variables.sort(key=lambda var: var.file_path) file_groups = groupby(variables, key=lambda var: var.file_path) # Load data from each data source for file, file_vars in file_groups: # Attempt to load the file, trapping exceptions as critical logger # messages and defer failure until the whole configuration has been # processed try: loaded_data = load_to_dataarray( file=Path(file), var_names=[var.var_name for var in file_vars], ) except Exception as err: LOGGER.error(str(err)) clean_load = False else: for var_name, data_array in loaded_data.items(): self[var_name] = data_array if not clean_load: msg = "Data configuration did not load cleanly - check log" LOGGER.critical(msg) raise ConfigurationError(msg)
[docs] def save_to_netcdf( self, output_file_path: Path, timing: ModelTiming, variables_to_save: list[str] | None = None, ) -> None: """Save the contents of the data object as a NetCDF file. Either the whole contents of the data object or specific variables of interest can be saved using this function. Args: output_file_path: Path location to save the Virtual Ecosystem model state. timing: The ModelTiming instance for the simulation variables_to_save: List of variables to be saved. If not provided then all variables are saved. """ # Check that the folder to save to exists and that there isn't already a file # saved there check_outfile(output_file_path) # If the file path is okay then write the model state out as a NetCDF. Should # check if all variables should be saved or just the requested ones. if variables_to_save: out = self.data[variables_to_save] else: out = self.data # Add the timestamps to the output out["timestamp"] = DataArray(timing.update_datestamps, dims="time_index") out.to_netcdf(output_file_path)
[docs] def save_timeslice_to_netcdf( self, output_file_path: Path, variables_to_save: list[str], time_index: int, timestamp: np.datetime64, ) -> None: """Save specific variables from current state of data as a NetCDF file. At present, this function save each time step individually. In future, this function might be altered to append multiple time steps at once, as this could improve performance significantly. Args: output_file_path: Path location to save NetCDF file to. variables_to_save: List of variables to save in the file time_index: The time index of the slice being saved timestamp: The timestamp of the start of the timeslice Raises: ConfigurationError: If the file to save to can't be found """ # Check that the folder to save to exists and that there isn't already a file # saved there check_outfile(output_file_path) # Loop over variables adding them to the new dataset time_slice = ( self.data[variables_to_save] .expand_dims({"time_index": 1}) .assign_coords(time_index=[time_index]) ) # Add the timestamp time_slice["timestamp"] = DataArray([timestamp], dims="time_index") # Save and close new dataset time_slice.to_netcdf(Path(output_file_path)) time_slice.close()
[docs] def add_from_dict(self, output_dict: dict[str, DataArray]) -> None: """Update data object from dictionary of variables. This function takes a dictionary of updated variables to replace the corresponding variables in the data object. If a variable is not in data, it is added. This will need to be reassessed as the model evolves; TODO we might want to split the function in strict 'replace' and 'add' functionalities. Args: output_dict: dictionary of variables from submodule Returns: an updated data object for the current time step """ for variable in output_dict: self[variable] = output_dict[variable]
[docs] def output_current_state( self, variables_to_save: list[str], output_directory_path: Path, time_index: int, timestamp: np.datetime64, ) -> Path: """Method to output the current state of the data object. This function outputs all variables stored in the data object, except for any data with a "time_index" dimension defined (at present only climate input data has this). This data can either be saved as a new file or appended to an existing file. Args: variables_to_save: List of variables to save output_directory_path: The output directory for the current state data. time_index: The index representing the current time step in the data object. timestamp: The timestamp of the start of the timeslice Raises: ConfigurationError: If the final output directory doesn't exist, isn't a directory, or the final output file already exists (when in new file mode). If the file to append to is missing (when not in new file mode). Returns: A path to the file that the current state is saved in """ # Create output file path for specific time index out_path = output_directory_path / f"continuous_state{time_index:05}.nc" # Save the required variables by appending to existing file self.save_timeslice_to_netcdf( output_file_path=out_path, variables_to_save=variables_to_save, time_index=time_index, timestamp=timestamp, ) return out_path
[docs] def merge_continuous_data_files( merged_file_path: Path, continuous_data_files: list[Path] ) -> None: """Merge all continuous data files in a folder into a single file. This function deletes all of the continuous output files it has been asked to merge once the combined output is saved. Args: merged_file_path: The output file name for the merged continuous data. continuous_data_files: Files containing previously output continuous data Raises: ConfigurationError: If output folder doesn't exist or if it output file already exists """ # Check that output file doesn't already exist check_outfile(merged_file_path) # Open all files as a single dataset with open_mfdataset(continuous_data_files, lock=False) as all_data: # Specify type of the layer roles object to allow for quicker saving by dask all_data["layer_roles"] = all_data["layer_roles"].astype("S9") # Save and close complete dataset all_data.to_netcdf(merged_file_path) # Iterate over all continuous files and delete them for file_path in continuous_data_files: file_path.unlink()
[docs] class DataGenerator: """Generate artificial data. Currently just a signature sketch. """ def __init__( self, # grid: GRID, spatial_axis: str, temporal_axis: str, temporal_interpolation: np.timedelta64, seed: int | None, method: str, # one of the numpy.random.Generator methods **kwargs: Any, ) -> None: pass