Source code for sup3r.pipeline.slicer

"""Slicer class for chunking forward pass input"""

import logging
from dataclasses import dataclass
from typing import Union

import numpy as np

from sup3r.pipeline.utilities import (
    get_chunk_slices,
)
from sup3r.preprocessing.utilities import _parse_time_slice, log_args

logger = logging.getLogger(__name__)


[docs] @dataclass class ForwardPassSlicer: """Get slices for sending data chunks through generator. Parameters ---------- coarse_shape : tuple Shape of full domain for low res data time_steps : int Number of time steps for full temporal domain of low res data. This is used to construct a dummy_time_index from np.arange(time_steps) time_slice : slice | list Slice to use to extract range from time_index. Can be a ``slice(start, stop, step)`` or list ``[start, stop, step]`` chunk_shape : tuple Max shape (spatial_1, spatial_2, temporal) of an unpadded coarse chunk to use for a forward pass. The number of nodes that the ForwardPassStrategy is set to distribute to is calculated by dividing up the total time index from all file_paths by the temporal part of this chunk shape. Each node will then be parallelized accross parallel processes by the spatial chunk shape. If temporal_pad / spatial_pad are non zero the chunk sent to the generator can be bigger than this shape. If running in serial set this equal to the shape of the full spatiotemporal data volume for best performance. s_enhance : int Spatial enhancement factor t_enhance : int Temporal enhancement factor spatial_pad : int Size of spatial overlap between coarse chunks passed to forward passes for subsequent spatial stitching. This overlap will pad both sides of the fwp_chunk_shape. Note that the first and last chunks in any of the spatial dimension will not be padded. temporal_pad : int Size of temporal overlap between coarse chunks passed to forward passes for subsequent temporal stitching. This overlap will pad both sides of the fwp_chunk_shape. Note that the first and last chunks in the temporal dimension will not be padded. """ coarse_shape: Union[tuple, list] time_steps: int s_enhance: int t_enhance: int time_slice: slice temporal_pad: int spatial_pad: int chunk_shape: Union[tuple, list] @log_args def __post_init__(self): self.dummy_time_index = np.arange(self.time_steps) self.time_slice = _parse_time_slice(self.time_slice) self._chunk_lookup = None self._s1_lr_slices = None self._s2_lr_slices = None self._s1_lr_pad_slices = None self._s2_lr_pad_slices = None self._s_lr_slices = None self._s_lr_pad_slices = None self._s_lr_crop_slices = None self._t_lr_pad_slices = None self._t_lr_crop_slices = None self._s_hr_slices = None self._s_hr_crop_slices = None self._t_hr_crop_slices = None self._hr_crop_slices = None
[docs] def get_spatial_slices(self): """Get spatial slices for small data chunks that are passed through generator Returns ------- s_lr_slices: list List of slices for low res data chunks which have not been padded. data_handler.data[s_lr_slice] corresponds to an unpadded low res input to the model. s_lr_pad_slices : list List of slices which have been padded so that high res output can be stitched together. data_handler.data[s_lr_pad_slice] corresponds to a padded low res input to the model. s_hr_slices : list List of slices for high res data corresponding to the lr_slices regions. output_array[s_hr_slice] corresponds to the cropped generator output. """ return (self.s_lr_slices, self.s_lr_pad_slices, self.s_hr_slices)
[docs] def get_time_slices(self): """Calculate the number of time chunks across the full time index Returns ------- t_lr_slices : list List of low-res non-padded time index slices. e.g. If fwp_chunk_size[2] is 5 then the size of these slices will always be 5. t_lr_pad_slices : list List of low-res padded time index slices. e.g. If fwp_chunk_size[2] is 5 the size of these slices will be 15, with exceptions at the start and end of the full time index. """ return self.t_lr_slices, self.t_lr_pad_slices
@property def s_lr_slices(self): """Get low res spatial slices for small data chunks that are passed through generator Returns ------- _s_lr_slices : list List of spatial slices corresponding to the unpadded spatial region going through the generator """ if self._s_lr_slices is None: self._s_lr_slices = [ (s1, s2) for s1 in self.s1_lr_slices for s2 in self.s2_lr_slices ] return self._s_lr_slices @property def s_lr_pad_slices(self): """Get low res padded slices for small data chunks that are passed through generator Returns ------- _s_lr_pad_slices : list List of slices which have been padded so that high res output can be stitched together. Each entry in this list has a slice for each spatial dimension. data_handler.data[s_lr_pad_slice] gives the padded data volume passed through the generator """ if self._s_lr_pad_slices is None: self._s_lr_pad_slices = [ (s1, s2) for s1 in self.s1_lr_pad_slices for s2 in self.s2_lr_pad_slices ] return self._s_lr_pad_slices @property def t_lr_pad_slices(self): """Get low res temporal padded slices for distributing time chunks across nodes. These slices correspond to the time chunks sent to each node and are padded according to temporal_pad. Returns ------- _t_lr_pad_slices : list List of low res temporal slices which have been padded so that high res output can be stitched together """ if self._t_lr_pad_slices is None: self._t_lr_pad_slices = self.get_padded_slices( self.t_lr_slices, self.time_steps, 1, self.temporal_pad, self.time_slice.step, ) return self._t_lr_pad_slices @property def t_lr_crop_slices(self): """Get low res temporal cropped slices for cropping time index of padded input data. Returns ------- _t_lr_crop_slices : list List of low res temporal slices for cropping padded input data """ if self._t_lr_crop_slices is None: self._t_lr_crop_slices = self.get_cropped_slices( self.t_lr_slices, self.t_lr_pad_slices, 1 ) return self._t_lr_crop_slices @property def t_hr_crop_slices(self): """Get high res temporal cropped slices for cropping forward pass output before stitching together Returns ------- _t_hr_crop_slices : list List of high res temporal slices for cropping padded generator output """ hr_crop_start = None hr_crop_stop = None if self.temporal_pad > 0: hr_crop_start = self.t_enhance * self.temporal_pad hr_crop_stop = -hr_crop_start if self._t_hr_crop_slices is None: # don't use self.get_cropped_slices() here because temporal padding # gets weird at beginning and end of timeseries and the temporal # axis should always be evenly chunked. self._t_hr_crop_slices = [ slice(hr_crop_start, hr_crop_stop) for _ in range(len(self.t_lr_slices)) ] return self._t_hr_crop_slices @property def s1_hr_slices(self): """Get high res spatial slices for first spatial dimension""" return self.get_hr_slices(self.s1_lr_slices, self.s_enhance) @property def s2_hr_slices(self): """Get high res spatial slices for second spatial dimension""" return self.get_hr_slices(self.s2_lr_slices, self.s_enhance) @property def s_hr_slices(self): """Get high res slices for indexing full generator output array Returns ------- _s_hr_slices : list List of high res slices. Each entry in this list has a slice for each spatial dimension. output[hr_slice] gives the superresolved domain corresponding to data_handler.data[lr_slice] """ if self._s_hr_slices is None: self._s_hr_slices = [] self._s_hr_slices = [ (s1, s2) for s1 in self.s1_hr_slices for s2 in self.s2_hr_slices ] return self._s_hr_slices @property def s_lr_crop_slices(self): """Get low res cropped slices for cropping input chunk domain Returns ------- _s_lr_crop_slices : list List of low res cropped slices. Each entry in this list has a slice for each spatial dimension. """ if self._s_lr_crop_slices is None: self._s_lr_crop_slices = [] s1_crop_slices = self.get_cropped_slices( self.s1_lr_slices, self.s1_lr_pad_slices, 1 ) s2_crop_slices = self.get_cropped_slices( self.s2_lr_slices, self.s2_lr_pad_slices, 1 ) self._s_lr_crop_slices = [ (s1, s2) for s1 in s1_crop_slices for s2 in s2_crop_slices ] return self._s_lr_crop_slices @property def s_hr_crop_slices(self): """Get high res cropped slices for cropping generator output Returns ------- _s_hr_crop_slices : list List of high res cropped slices. Each entry in this list has a slice for each spatial dimension. """ hr_crop_start = None hr_crop_stop = None if self.spatial_pad > 0: hr_crop_start = self.s_enhance * self.spatial_pad hr_crop_stop = -hr_crop_start if self._s_hr_crop_slices is None: self._s_hr_crop_slices = [] s1_hr_crop_slices = [ slice(hr_crop_start, hr_crop_stop) for _ in range(len(self.s1_lr_slices)) ] s2_hr_crop_slices = [ slice(hr_crop_start, hr_crop_stop) for _ in range(len(self.s2_lr_slices)) ] self._s_hr_crop_slices = [ (s1, s2) for s1 in s1_hr_crop_slices for s2 in s2_hr_crop_slices ] return self._s_hr_crop_slices @property def hr_crop_slices(self): """Get high res spatiotemporal cropped slices for cropping generator output Returns ------- _hr_crop_slices : list List of high res spatiotemporal cropped slices. Each entry in this list has a crop slice for each spatial dimension and temporal dimension and then slice(None) for the feature dimension. model.generate()[hr_crop_slice] gives the cropped generator output corresponding to outpuUnion[np.ndarray, da.core.Array][hr_slice] """ if self._hr_crop_slices is None: self._hr_crop_slices = [] for t in self.t_hr_crop_slices: node_slices = [ (s[0], s[1], t, slice(None)) for s in self.s_hr_crop_slices ] self._hr_crop_slices.append(node_slices) return self._hr_crop_slices @property def s1_lr_pad_slices(self): """List of low resolution spatial slices with padding for first spatial dimension""" if self._s1_lr_pad_slices is None: self._s1_lr_pad_slices = self.get_padded_slices( self.s1_lr_slices, self.coarse_shape[0], 1, padding=self.spatial_pad, ) return self._s1_lr_pad_slices @property def s2_lr_pad_slices(self): """List of low resolution spatial slices with padding for second spatial dimension""" if self._s2_lr_pad_slices is None: self._s2_lr_pad_slices = self.get_padded_slices( self.s2_lr_slices, self.coarse_shape[1], 1, padding=self.spatial_pad, ) return self._s2_lr_pad_slices @property def s1_lr_slices(self): """List of low resolution spatial slices for first spatial dimension considering padding on all sides of the spatial raster.""" ind = slice(0, self.coarse_shape[0]) slices = get_chunk_slices( self.coarse_shape[0], self.chunk_shape[0], index_slice=ind ) return slices @property def s2_lr_slices(self): """List of low resolution spatial slices for second spatial dimension considering padding on all sides of the spatial raster.""" ind = slice(0, self.coarse_shape[1]) slices = get_chunk_slices( self.coarse_shape[1], self.chunk_shape[1], index_slice=ind ) return slices @property def t_lr_slices(self): """Low resolution temporal slices""" n_tsteps = len(self.dummy_time_index[self.time_slice]) n_chunks = n_tsteps / self.chunk_shape[2] n_chunks = int(np.ceil(n_chunks)) ti_slices = self.dummy_time_index[self.time_slice] ti_slices = np.array_split(ti_slices, n_chunks) ti_slices = [ slice(c[0], c[-1] + 1, self.time_slice.step) for c in ti_slices ] return ti_slices
[docs] @staticmethod def get_hr_slices(slices, enhancement, step=None): """Get high resolution slices for temporal or spatial slices Parameters ---------- slices : list Low resolution slices to be enhanced enhancement : int Enhancement factor step : int | None Step size for slices Returns ------- hr_slices : list High resolution slices """ hr_slices = [] if step is not None: step *= enhancement for sli in slices: start = sli.start * enhancement stop = sli.stop * enhancement hr_slices.append(slice(start, stop, step)) return hr_slices
@property def chunk_lookup(self): """Get a 3D array with shape (n_spatial_1_chunks, n_spatial_2_chunks, n_time_chunks) where each value is the chunk index.""" if self._chunk_lookup is None: n_s1 = len(self.s1_lr_slices) n_s2 = len(self.s2_lr_slices) n_t = self.n_time_chunks lookup = np.arange(self.n_chunks).reshape((n_t, n_s1, n_s2)) self._chunk_lookup = np.transpose(lookup, axes=(1, 2, 0)) return self._chunk_lookup @property def spatial_chunk_lookup(self): """Get a 2D array with shape (n_spatial_1_chunks, n_spatial_2_chunks) where each value is the spatial chunk index.""" n_s1 = len(self.s1_lr_slices) n_s2 = len(self.s2_lr_slices) return np.arange(self.n_spatial_chunks).reshape((n_s1, n_s2)) @property def n_spatial_chunks(self): """Get the number of spatial chunks""" return len(self.hr_crop_slices[0]) @property def n_time_chunks(self): """Get the number of temporal chunks""" return len(self.t_hr_crop_slices) @property def n_chunks(self): """Get total number of spatiotemporal chunks""" return self.n_spatial_chunks * self.n_time_chunks
[docs] @staticmethod def get_padded_slices(slices, shape, enhancement, padding, step=None): """Get padded slices with the specified padding size, max shape, enhancement, and step size Parameters ---------- slices : list List of low res unpadded slice shape : int max possible index of a padded slice. e.g. if the slices are indexing a dimension with size 10 then a padded slice cannot have an index greater than 10. enhancement : int Enhancement factor. e.g. If these slices are indexing a spatial dimension which will be enhanced by 2x then enhancement=2. padding : int Padding factor. e.g. If these slices are indexing a spatial dimension and the spatial_pad is 10 this is 10. It will be multiplied by the enhancement factor if the slices are to be used to index an enhanced dimension. step : int | None Step size for slices. e.g. If these slices are indexing a temporal dimension and time_slice.step = 3 then step=3. Returns ------- list Padded slices for temporal or spatial dimensions. """ step = step or 1 pad = step * padding * enhancement pad_slices = [] for _, s in enumerate(slices): start = np.max([0, s.start * enhancement - pad]) end = np.min([enhancement * shape, s.stop * enhancement + pad]) pad_slices.append(slice(start, end, step)) return pad_slices
[docs] @staticmethod def get_cropped_slices(unpadded_slices, padded_slices, enhancement): """Get cropped slices to cut off padded output Parameters ---------- unpadded_slices : list List of unpadded slices padded_slices : list List of padded slices enhancement : int Enhancement factor for the data to be cropped. Returns ------- list Cropped slices for temporal or spatial dimensions. """ cropped_slices = [] for ps, us in zip(padded_slices, unpadded_slices): start = us.start stop = us.stop step = us.step or 1 if start is not None: start = enhancement * (us.start - ps.start) // step if stop is not None: stop = enhancement * (us.stop - ps.stop) // step if start is not None and start <= 0: start = None if stop is not None and stop >= 0: stop = None cropped_slices.append(slice(start, stop)) return cropped_slices