Source code for esmf_regrid.experimental.partition

"""Provides an interface for splitting up a large regridding task."""

import esmpy
import numpy as np

from esmf_regrid.constants import Constants
from esmf_regrid.experimental._partial import PartialRegridder
from esmf_regrid.experimental.io import load_regridder, save_regridder
from esmf_regrid.schemes import _get_grid_dims


def _get_chunk(cube, sl):
    if cube.mesh is None:
        grid_dims = _get_grid_dims(cube)
    else:
        grid_dims = (cube.mesh_dim(),)
    full_slice = [np.s_[:]] * len(cube.shape)
    for s, d in zip(sl, grid_dims, strict=True):
        full_slice[d] = np.s_[s[0] : s[1]]
    return cube[*full_slice]


def _determine_blocks(shape, chunks, num_chunks, explicit_blocks):
    which_inputs = (
        chunks is not None,
        num_chunks is not None,
        explicit_blocks is not None,
    )
    if sum(which_inputs) == 0:
        msg = "Partition blocks must must be specified by either chunks, num_chunks, or explicit_chunks."
        raise ValueError(msg)
    if sum(which_inputs) > 1:
        msg = "Potentially conflicting partition block definitions."
        raise ValueError(msg)
    if num_chunks is not None:
        chunks = [s // n for s, n in zip(shape, num_chunks, strict=True)]
        for chunk in chunks:
            if chunk == 0:
                msg = "`num_chunks` cannot divide a dimension into more blocks than the size of that dimension."
                raise ValueError(msg)
    if chunks is not None:
        if all(isinstance(x, int) for x in chunks):
            proper_chunks = []
            for s, c in zip(shape, chunks, strict=True):
                proper_chunk = [c] * (s // c)
                if s % c != 0:
                    proper_chunk += [s % c]
                proper_chunks.append(proper_chunk)
            chunks = proper_chunks
        for s, chunk in zip(shape, chunks, strict=True):
            if sum(chunk) != s:
                msg = "Chunks must sum to the size of their respective dimension."
                raise ValueError(msg)
        bounds = [np.cumsum([0, *chunk]) for chunk in chunks]
        if len(bounds) == 1:
            msg = "Chunks must have exactly two dimensions."
            raise ValueError(msg)
            # TODO: This is currently blocked by the fact that slicing an Iris cube on its mesh dimension
            #  does not currently yield another cube with a mesh. When this is fixed, the following
            #  code can be uncommented and the noqa on the following line can be removed.
            # explicit_blocks = [
            #     [[int(lower), int(upper)]]
            #     for lower, upper in zip(bounds[0][:-1], bounds[0][1:], strict=True)
            # ]
        elif len(bounds) == 2:  # noqa: RET506
            explicit_blocks = [
                [[int(ly), int(uy)], [int(lx), int(ux)]]
                for ly, uy in zip(bounds[0][:-1], bounds[0][1:], strict=True)
                for lx, ux in zip(bounds[1][:-1], bounds[1][1:], strict=True)
            ]
        else:
            msg = "Chunks must not exceed two dimensions."
            raise ValueError(msg)
    if len(explicit_blocks[0]) != len(shape):
        msg = "Dimensionality of blocks does not match the number of dimensions."
        raise ValueError(msg)
    return explicit_blocks


[docs] class Partition: """Class for breaking down regridding into manageable chunks.""" def __init__( self, src, tgt, scheme, file_names, use_dask_src_chunks=False, src_chunks=None, num_src_chunks=None, explicit_src_blocks=None, auto_generate=False, saved_files=None, ): """Class for breaking down regridding into manageable chunks. Parameters ---------- src : cube Source cube. tgt : cube Target cube. scheme : regridding scheme Regridding scheme to generate regridders, either ESMFAreaWeighted or ESMFBilinear. file_names : iterable of str A list of file names to save/load parts of the regridder to/from. use_dask_src_chunks : bool, default=False If true, partition using the same chunks from the source cube. src_chunks : numpy array, tuple of int or tuple of tuple of int, default=None Specify the size of blocks to use to divide up the cube. Dimensions are specified in y,x axis order. If `src_chunks` is a tuple of int, each integer describes the maximum size of a block in that dimension. If `src_chunks` is a tuple of tuples, each sub-tuple describes the size of each successive block in that dimension. The sum of these block sizes in each of the sub-tuples should add up to the total size of that dimension or else an error is raised. num_src_chunks : tuple of int Specify the number of blocks to use to divide up the cube. Dimensions are specified in y,x axis order. Each integer describes the number of blocks that dimension will be divided into. explicit_src_blocks : arraylike NxMx2 Explicitly specify the bounds of each block in the partition. Describes N blocks along M dimensions with a pair of upper and lower bounds. The upper and lower bounds describe a slice of an array, e.g. the bounds (3, 6) describe the indices 3, 4, 5 in a particular dimension. auto_generate : bool, default=False When true, start generating files on initialisation. saved_files : iterable of str A list of paths to previously saved files. Warnings -------- This class is still experimental. While we aim to maintain backwards compatibility where possible, there is no guarantee that the structure of any generated files will remain consistent and compatible with future versions. Note ---- The source is partitioned into blocks using one of the four mutually exclusive arguments, `use_dask_src_chunks`, `src_chunks`, `num_src_chunks`, or `explicit_src_blocks`. These describe a partition into a number of blocks which must equal the number of `file_names`. Currently, it is only possible to divide the source grid into chunks. Meshes are not yet supported as a source. """ if scheme._method == Constants.Method.NEAREST: msg = "The `Nearest` method is not implemented." raise NotImplementedError(msg) if scheme._method == Constants.Method.BILINEAR: pole_method = scheme.esmf_args.get("pole_method") if pole_method != esmpy.PoleMethod.NONE: msg = ( "Bilinear regridding must have a `pole_method` of `esmpy.PoleMethod.NONE` in " "the `esmf_args` in order for Partition to work.`" ) raise ValueError(msg) # TODO: Extract a slice of the cube. self.src = src if src.mesh is None: grid_dims = _get_grid_dims(src) else: msg = "Partition does not yet support source meshes." raise NotImplementedError(msg) # TODO: This is currently blocked by the fact that slicing an Iris cube on its mesh dimension # does not currently yield another cube with a mesh. When this is fixed, the following # code can be uncommented. # grid_dims = (src.mesh_dim(),) shape = tuple(src.shape[i] for i in grid_dims) self.tgt = tgt self.scheme = scheme # TODO: consider abstracting away the idea of files self.file_names = file_names if use_dask_src_chunks: if src_chunks is not None: msg = "`src_chunks` and `use_dask_src_chunks` cannot be used at the same time." raise ValueError(msg) if not src.has_lazy_data(): msg = "If `use_dask_src_chunks=True`, the source cube must be lazy." raise TypeError(msg) src_chunks = src.slices(grid_dims).next().lazy_data().chunks self.src_blocks = _determine_blocks( shape, src_chunks, num_src_chunks, explicit_src_blocks ) if len(self.src_blocks) != len(file_names): msg = "Number of source blocks does not match number of file names." raise ValueError(msg) # This will be controllable in future tgt_blocks = None self.tgt_blocks = tgt_blocks if tgt_blocks is not None: msg = "Target chunking not yet implemented." raise NotImplementedError(msg) # Note: this may need to become more sophisticated when both src and tgt are large self.file_block_dict = dict(zip(self.file_names, self.src_blocks, strict=True)) if saved_files is None: self.saved_files = [] else: self.saved_files = saved_files if auto_generate: self.generate_files(self.file_names) def __repr__(self): """Return a representation of the class.""" result = ( f"Partition(" f"src={self.src!r}, " f"tgt={self.tgt!r}, " f"scheme={self.scheme}, " f"num file_names={len(self.file_names)}," f"num saved_files={len(self.saved_files)})" ) return result @property def unsaved_files(self): """List of files not yet generated.""" return [file for file in self.file_names if file not in self.saved_files]
[docs] def generate_files(self, files_to_generate=None): """Generate files with regridding information. Parameters ---------- files_to_generate : int, default=None Specify the number of files to generate, default behaviour is to generate all files. """ if files_to_generate is None: files = self.unsaved_files else: if not isinstance(files_to_generate, int): msg = "`files_to_generate` must be an integer." raise ValueError(msg) files = self.unsaved_files[:files_to_generate] for file in files: src_block = self.file_block_dict[file] src = _get_chunk(self.src, src_block) tgt = self.tgt regridder = self.scheme.regridder(src, tgt) weights = regridder.regridder.weight_matrix regridder = PartialRegridder( src, tgt, src_block, None, weights, self.scheme ) save_regridder(regridder, file, allow_partial=True) self.saved_files.append(file)
[docs] def apply_regridders(self, cube, allow_incomplete=False): """Apply the saved regridders to a cube. Parameters ---------- allow_incomplete : bool, default=False If False, raise an error if not all files have been generated. If True, perform regridding using the files which have been generated. """ # for each target chunk, iterate through each associated regridder # for now, assume one target chunk if len(self.saved_files) == 0: msg = "No files have been generated." raise OSError(msg) if not allow_incomplete and len(self.unsaved_files) != 0: msg = "Not all files have been generated." raise OSError(msg) current_result = None current_weights = None files = self.saved_files for file, chunk in zip(self.file_names, self.src_blocks, strict=True): if file in files: next_regridder = load_regridder(file, allow_partial=True) cube_chunk = _get_chunk(cube, chunk) next_weights, next_result, extra = next_regridder.partial_regrid( cube_chunk ) if current_weights is None: current_weights = next_weights else: current_weights += next_weights if current_result is None: current_result = next_result else: current_result += next_result # NOTE: the final "finish_regridding" operation could be performed using any one # of the partial regridders,but the correct "corresponding" slice of the source # must be passed. # See :meth:`~esmf_regrid.experimental._partial.PartialRegridder.finish_regridding`. return next_regridder.finish_regridding( cube_chunk, # matches *this* partial regridder current_weights, current_result, extra, # should be *the same* for all the partial results )