"""The :mod:`~virtual_ecosystem.core.grid` module is used to create the grid of cells
underlying the simulation and to identify the neighbourhood connections of cells.
- set up neighbourhoods. ? store as graph (networkx - might only need a really
lightweight graph description).
- import of geojson grids? Way to link structured landscape into cells. Can use
data loading methods to assign values to grids? This would be a useful way of
defining mappings though.
- maybe look at libpysal if we end up needing more weights/spatial analysis stuff?
https://pysal.org/libpysal/
""" # noqa: D205
from __future__ import annotations
import json
from collections.abc import Callable, Sequence
from typing import Any
import numpy as np
from numpy.typing import NDArray
from scipy.spatial.distance import cdist, pdist, squareform # type: ignore
from shapely import GeometryCollection, Point, Polygon, STRtree # type: ignore
from shapely.affinity import scale, translate # type: ignore
from virtual_ecosystem.core.exceptions import ConfigurationError
from virtual_ecosystem.core.logger import LOGGER
from virtual_ecosystem.core.model_config import GridConfiguration
GRID_REGISTRY: dict[str, Callable] = {}
"""A registry for different grid geometries.
This dictionary maps grid geometry types (square, hex, etc) to a function generating a
grid of that type. Users can register their own grid types using the `register_grid`
decorator.
"""
type GRID_STRUCTURE_SIG = tuple[list[int], list[Polygon]]
"""Type signature of the data structure to be returned from grid creator functions.
The first value is a list of integer cell ids, the second is a matching list of the
polygons for each cell id. Although cell ids could be a numpy array, the numpy int types
then need handling in arguments and json representation.
"""
[docs]
def register_grid(grid_type: str) -> Callable:
"""Add a grid type and creator function to the grid registry.
This decorator is used to add a function creating a grid layout to the registry of
accepted grids. The function must return a numpy array of integer polygon ids and an
equal length list of Polygon objects, following the GRID_STRUCTURE_SIG signature.
The grid_type argument is used to identify the grid creation function to be used by
the Grid class and in configuration files.
Args:
grid_type: A name to be used to identify the grid creation function.
"""
def decorator_register_grid(func: Callable) -> Callable:
# Add the grid type to the registry
if grid_type in GRID_REGISTRY:
LOGGER.warning(
"Grid type %s already exists and is being replaced", grid_type
)
GRID_REGISTRY[grid_type] = func
return func
return decorator_register_grid
[docs]
@register_grid(grid_type="square")
def make_square_grid(
cell_area: float,
cell_nx: int,
cell_ny: int,
xoff: float = 0,
yoff: float = 0,
) -> GRID_STRUCTURE_SIG:
"""Create a square grid.
Args:
cell_area: The area of each hexagon cell in m2
cell_nx: The number of grid cells in the X direction.
cell_ny: The number of grid cells in the Y direction.
xoff: An offset to use for the grid origin in the X direction in metres.
yoff: An offset to use for the grid origin in the Y direction in metres.
Returns:
Equal-length tuples of integer polygon ids and Polygon objects
"""
# Create the polygon prototype object, with origin at 0,0 and area 1
# Note coordinate order is anti-clockwise - right hand rule.
prototype = Polygon([[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]])
# Scale to requested size and origin
scale_factor = np.sqrt(cell_area)
prototype = scale(prototype, xfact=scale_factor, yfact=scale_factor, origin=(0, 0))
prototype = translate(prototype, xoff=xoff, yoff=yoff)
# Get the cell ids and centres of the cells
idx_y, idx_x = np.indices((cell_ny, cell_nx))
cell_ids = idx_x + idx_y * cell_nx
cell_x = idx_x * scale_factor
cell_y = np.flipud(idx_y) * scale_factor
# Get the list of polygons
cell_polygon_list: list[Polygon] = [
translate(prototype, xoff=xf, yoff=yf)
for xf, yf in zip(cell_x.flatten(), cell_y.flatten())
]
# Get list of ids
cell_ids_list: list[int] = cell_ids.flatten().tolist()
return cell_ids_list, cell_polygon_list
[docs]
@register_grid(grid_type="hexagon")
def make_hex_grid(
cell_area: float,
cell_nx: int,
cell_ny: int,
xoff: float = 0,
yoff: float = 0,
) -> GRID_STRUCTURE_SIG:
"""Create a hexagonal grid.
Args:
cell_area: The area of each hexagon cell in m2.
cell_nx: The number of grid cells in the X direction.
cell_ny: The number of grid cells in the Y direction.
xoff: An offset to use for the grid origin in the X direction in metres.
yoff: An offset to use for the grid origin in the Y direction in metres.
Returns:
Equal-length tuples of integer polygon ids and Polygon objects
"""
# TODO - implement grid orientation and kwargs passing
# https://www.redblobgames.com/grids/hexagons/
# Create the polygon prototype object, with origin at 0,0 and area 1
# Note coordinate order is anti-clockwise - right hand rule
side_length_a1 = 3 ** (1 / 4) * np.sqrt(2 / 9)
apothem_a1 = np.sqrt(3) * side_length_a1 / 2
prototype = Polygon(
[
[apothem_a1, 0],
[2 * apothem_a1, side_length_a1 * 0.5],
[2 * apothem_a1, side_length_a1 * 1.5],
[apothem_a1, side_length_a1 * 2],
[0, side_length_a1 * 1.5],
[0, side_length_a1 * 0.5],
[apothem_a1, 0],
]
)
# Scale to requested size and origin and get side length and apothem for translation
# when tiling the grid
scale_factor = np.sqrt(cell_area)
side_length = 3 ** (1 / 4) * np.sqrt(2 * (cell_area / 9))
apothem = np.sqrt(3) * side_length / 2
prototype = scale(prototype, xfact=scale_factor, yfact=scale_factor, origin=(0, 0))
prototype = translate(prototype, xoff=xoff, yoff=yoff)
# Get the cell ids and centres of the cells
idx_y, idx_x = np.indices((cell_ny, cell_nx))
cell_ids = idx_x + idx_y * cell_nx
cell_x = 2 * apothem * idx_x + apothem * (idx_y % 2)
cell_y = 1.5 * side_length * np.flipud(idx_y)
# Get the list of polygons
cell_polygon_list: list[Polygon] = [
translate(prototype, xoff=xf, yoff=yf)
for xf, yf in zip(cell_x.flatten(), cell_y.flatten())
]
# Get list of ids
cell_ids_list: list[int] = cell_ids.flatten().tolist()
return cell_ids_list, cell_polygon_list
[docs]
class Grid:
"""Define the grid of cells used in a Virtual Ecosystem simulation.
The simulation grid used in a Virtual Ecosystem simulation is assumed to be a
projected coordinate system with linear dimensions in metres. Grid cell sizes are
set using their area in square metres and users can specify offsets to align a
simulation grid to a particular projected coordinate system. However, the Virtual
Ecosystem codebase makes no attempt to manage or validate projection information:
we assume that users maintain a common coordinate system across inputs.
Args:
grid_type: The grid type to be used, which must identify a grid creation
function in the :data:`~virtual_ecosystem.core.grid.GRID_REGISTRY`
dictionary.
cell_area: The area of each grid cell, in square metres.
cell_nx: The number of cells in the grid along the x (easting) axis
cell_ny: The number of cells in the grid along the y (northing) axis
xoff: An offset for the grid x origin in metres
yoff: An offset for the grid y origin in metres
"""
def __init__(
self,
grid_type: str = "square",
cell_area: float = 10000,
cell_nx: int = 10,
cell_ny: int = 10,
xoff: float = 0,
yoff: float = 0,
) -> None:
# Populate the attributes
self.grid_type = grid_type
"""The grid type used to create the instance"""
self.cell_area = cell_area
"""The area of the grid cells"""
self.cell_nx = cell_nx
"""The number of cells in the grid in the X dimension"""
self.cell_ny = cell_ny
"""The number of cells in the grid in the Y dimension"""
self.xoff = xoff
"""An offset for the cell X coordinates"""
self.yoff = yoff
"""An offset for the cell Y coordinates"""
self.cell_id: list[int]
"""A list of unique integer ids for each cell."""
self.polygons: list[Polygon]
"""A list of of the cell polygon geometries, as shapely.geometry.Polygon
objects, in cell_id order"""
self.ncells: int
"""The total number of cells in the grid."""
self.centroids: np.ndarray
"""A list of the centroid of each cell as shapely.geometry.Point objects, in
cell_id order."""
self._strtree: STRtree
"""An STRtree object of the grid polygons, used for searching coordinates
matches in loading data."""
# Retrieve the creator function from the grid registry and handle unknowns
creator = GRID_REGISTRY.get(self.grid_type, None)
if creator is None:
raise ValueError(f"The grid_type {self.grid_type} is not defined.")
# Run the grid creation
self.cell_id, self.polygons = creator(
cell_area=self.cell_area,
cell_nx=self.cell_nx,
cell_ny=self.cell_ny,
xoff=self.xoff,
yoff=self.yoff,
)
if len(self.cell_id) != len(self.polygons):
raise ValueError(
f"The {self.grid_type} creator function generated ids and polygons of "
"unequal length."
)
self.n_cells = len(self.cell_id)
# Get the centroids as a numpy array
centroids = [cell.centroid for cell in self.polygons]
self.centroids = np.array([(gm.xy[0][0], gm.xy[1][0]) for gm in centroids])
# Populate the STRtree
self._strtree = STRtree(self.polygons)
# Get the bounds as a 4 tuple
self.bounds: GeometryCollection = GeometryCollection(self.polygons).bounds
"""A GeometryCollection providing the bounds of the cell polygons."""
# Define other attributes set by methods
# TODO - this might become a networkx graph
self._neighbours: list[NDArray[np.int_]] | None = None
# Do not by default store the full distance matrix
self._distances: NDArray[np.floating] | None = None
@property
def neighbours(self) -> list[NDArray[np.int_]]:
"""Return the neighbours property."""
if self._neighbours is None:
raise AttributeError("Neighbours not yet defined: use set_neighbours.")
return self._neighbours
[docs]
def __repr__(self) -> str:
"""Represent a CoreGrid as a string."""
return (
"CoreGrid("
f"{self.grid_type}, "
f"A={self.cell_area}, "
f"nx={self.cell_nx}, "
f"ny={self.cell_ny}, "
f"n={self.n_cells}, "
f"bounds={self.bounds})"
)
[docs]
@classmethod
def from_config(cls, config: GridConfiguration) -> Grid:
"""Factory function to generate a Grid instance from a configuration dict.
Args:
config: A validated Virtual Ecosystem model configuration object.
"""
# The GRID_REGISTRY is dynamic, so can only enforce the grid type checking at
# the point of Grid instance creation.
if config.grid_type not in GRID_REGISTRY:
LOGGER.error(f"The grid_type {config.grid_type} is not defined.")
to_raise = ConfigurationError("Grid creation from configuration failed.")
LOGGER.critical(to_raise)
raise to_raise
try:
grid = Grid(**config.model_dump())
except Exception as err:
LOGGER.error(err)
to_raise = ConfigurationError("Grid creation from configuration failed.")
LOGGER.critical(to_raise)
raise to_raise
LOGGER.info("Grid created from configuration.")
return grid
[docs]
def dumps(self, dp: int = 2, **kwargs: Any) -> str:
"""Export a grid as a GeoJSON string.
The virtual_ecosystem.core.Grid object assumes an unspecified projected
coordinate system. As a result, GeoJSON files created by this export method do
not strictly obey the GeoJSON specification
(https://www.rfc-editor.org/rfc/rfc7946), which requires WGS84 coordinates to
describe locations.
Args:
dp: The decimal place precision for exported coordinates
**kwargs: Arguments to json.dumps
"""
content = self._get_geojson(dp=dp)
return json.dumps(obj=content, **kwargs)
[docs]
def dump(self, outfile: str, dp: int = 2, **kwargs: Any) -> None:
"""Export a grid as a GeoJSON file.
The virtual_ecosystem.core.Grid object assumes an unspecified projected
coordinate system. As a result, GeoJSON files created by this export method do
not strictly obey the GeoJSON specification
(https://www.rfc-editor.org/rfc/rfc7946), which requires WGS84 coordinates to
describe locations.
Args:
outfile: A path used to export GeoJSON data.
dp: The decimal place precision for exported coordinates
**kwargs: Arguments to json.dump
"""
content = self._get_geojson(dp=dp)
with open(outfile, "w") as outf:
json.dump(obj=content, fp=outf, **kwargs)
def _get_geojson(self, dp: int) -> dict:
"""Convert the grid to a GeoJSON structured dictionary.
Args:
dp: The number of decimal places to use in reporting locations
"""
# Create the output feature list
features = []
for idx, poly in zip(self.cell_id, self.polygons):
# Get the coordinates of the outer ring - we are not expecting any holes in
# grid cell polygons - and wrap in a list to provide Polygon structure.
coords = [np.round(poly.exterior.coords, decimals=dp).tolist()]
feature = {
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": coords,
},
"properties": {
"cell_id": idx,
"cell_cx": np.round(self.centroids[idx][0], decimals=dp),
"cell_cy": np.round(self.centroids[idx][1], decimals=dp),
},
}
features.append(feature)
return {"type": "FeatureCollection", "features": features}
[docs]
def set_neighbours(
self,
edges: bool = True,
vertices: bool = False,
distance: float | None = None,
) -> None:
"""Populate the neighbour list for a Grid object.
This method generates a list giving the neighbours of each cell in the grid.
The edges and vertices arguments are used to include neighbouring cells that
share edges or vertices with a focal cell. Alternatively, a distance in metres
from the focal cell centroid can be used to include neighbouring cells
within that distance.
Args:
edges: Include cells with shared edges as neighbours.
vertices: Include cells with shared vertices as neighbours.
distance: A distance in metres.
"""
# This is a lot more irritating to implement than expected. Using geometry
# operations (as in Shapely.touches and pysal.weights.Queen/etc) turns out to be
# unreliable for hexagon grids simply due to floating point differences. For the
# moment, just implementing distance.
if distance is not None:
self._neighbours = [
np.where(self.get_distances(idx, self.cell_id) <= distance)[1]
for idx in self.cell_id
]
[docs]
def get_distances(
self,
cell_from: int | Sequence[int] | None,
cell_to: int | Sequence[int] | None,
) -> NDArray[np.floating]:
"""Calculate euclidean distances between cell centroids.
This method returns a two dimensional np.array containing the Euclidean
distances between two sets of cell ids.
Args:
cell_from: Either a single integer cell_id or a list of ids.
cell_to: Either a single integer cell_id or a list of ids.
Returns:
A 2D np.array of Euclidean distances
"""
if cell_from is None:
_cell_from: NDArray[np.int_] = np.arange(self.n_cells)
else:
_cell_from = np.array(
[cell_from] if isinstance(cell_from, int) else cell_from
)
if cell_to is None:
_cell_to: NDArray[np.int_] = np.arange(self.n_cells)
else:
_cell_to = np.array([cell_to] if isinstance(cell_to, int) else cell_to)
if self._distances is None:
return cdist(self.centroids[_cell_from], self.centroids[_cell_to])
return self._distances[np.ix_(_cell_from, _cell_to)]
[docs]
def populate_distances(
self,
) -> None:
"""Populate the complete cell distance matrix for the grid.
This stores the full distance matrix in the Grid instance, which is then used
for quick lookup by the get_distance method. However for large grids, this
requires a lot of memory, and so calculating distances on demand may be more
reasonable.
"""
self._distances = squareform(pdist(self.centroids))
[docs]
def map_xy_to_cell_id(
self,
x_coords: np.ndarray,
y_coords: np.ndarray,
) -> list[list[int]]:
"""Map a set of coordinates onto grid cells.
This function loops over points defined by pairs of x and y coordinates and maps
the coordinates onto the cell_ids of the grid. The method also checks to see
that each point intersects one and only one of the cell polygons defined in the
grid. Points that intersect no cells fall outside the grid polygons and points
that intersect more than one cell fall ambiguously on cell borders.
Args:
x_coords: A numpy array of x coordinates of points that should occur within
grid cells.
y_coords: A similar and equal-length array providing y coordinates.
Returns:
A list of lists showing the cell ids that map onto each point. The list for
a given point can be empty - when the point falls in no cell - or of length
> 1 when a point falls on adjoining cell boundaries.
"""
if (x_coords.ndim != 1) or (y_coords.ndim != 1):
raise ValueError("The x/y coordinate arrays are not 1 dimensional")
if x_coords.shape != y_coords.shape:
raise ValueError("The x/y coordinates are of unequal length")
# Get shapely points for the coordinates
xyp = [Point(x, y) for x, y in zip(x_coords, y_coords)]
# Map the Cell ID of each point - this is doing all pairs of points and cells,
# which is a computational choke point. Might be able to use a spatial search
# option, if this gets problematic, possibly including an STRTree in the grid
# object https://shapely.readthedocs.io/en/latest/strtree.html
return [
self._strtree.query(geometry=pt, predicate="intersects").tolist()
for pt in xyp
]
[docs]
def map_xy_to_cell_indexing(
self,
x_coords: np.ndarray,
y_coords: np.ndarray,
x_idx: np.ndarray | None,
y_idx: np.ndarray | None,
) -> tuple[np.ndarray, np.ndarray]:
"""Returns indexing to map xy coordinates a single cell_id axis.
This function maps the provided one-dimensional set of x and y points onto the
grid (using `~virtual_ecosystem.core.grid.Grid.map_xy_to_cell_id`) and then
checks that the mapped points provide a one-to-one mapping onto the grid cells.
The function then returns a pair of arrays that give indices on the original
x and y data to extract data along a single cell id axis. Because the inputs are
expected to be flattened onto a single dimension, the function also accepts x
and y index values that allow the cells to be mapped back into original
dimensions. If these are not provided, the coordinates are are assumed to have
come from a one-dimensional structure and so these indices are simple sequences
along `x_coords` and `y_coords`.
Args:
x_coords: A numpy array of x coordinates of points that should occur within
grid cells.
y_coords: A similar and equal-length array providing y coordinates.
x_idx: A numpy array providing original indices along the x-axis
y_idx: A numpy array providing original indices along the y-axis
Returns:
A list giving the integer cell id for each pair of points.
"""
# Get the coordinate mapping to cell ids
cell_map = self.map_xy_to_cell_id(x_coords=x_coords, y_coords=y_coords)
# Set indexing to sequence along coords if missing
if (x_idx is None) ^ (y_idx is None): # Note: ^ is xor
raise ValueError("Only one of x/y indices provided.")
# Generate internal x and y indices.
if (x_idx is not None) and (y_idx is not None):
_x_idx = x_idx
_y_idx = y_idx
else:
_x_idx = np.arange(x_coords.shape[0])
_y_idx = np.arange(y_coords.shape[0])
# Check the shapes of the indices.
if (_x_idx.shape != x_coords.shape) or (_y_idx.shape != y_coords.shape):
raise ValueError("Dimensions of x/y indices do not match coordinates")
# Find the set of total number of cell mappings per point
cell_counts = {len(mp) for mp in cell_map}
# Raise an exception where not all coords fall in a grid cell
if 0 in cell_counts:
raise ValueError("Mapped points fall outside grid.")
# Values greater than 1 indicate coordinates on cell edges
if any([c > 1 for c in cell_counts]):
raise ValueError("Mapped points fall on cell boundaries.")
# Now all points are 1 to 1 with cells so collapse down to list of ints
cell_id_map = [c[0] for c in cell_map]
# Get a list of all mapped cell ids.
cells_found = [v for v in cell_id_map]
# Now check for cells with more than one point and cells with no points.
if set(cells_found) != set(self.cell_id):
raise ValueError("Mapped points do not cover all cells.")
if len(cells_found) != self.n_cells:
raise ValueError("Some cells contain more than one point.")
# Sort indices into cell map order
cell_order = np.argsort(cell_id_map)
return _x_idx[cell_order], _y_idx[cell_order]