Source code for reVX.setbacks.base

# -*- coding: utf-8 -*-
"""
Base classes for setback exclusion computation
"""
import os
import logging
from copy import deepcopy
from warnings import warn
from math import floor, ceil
from itertools import product
from abc import abstractmethod
from concurrent.futures import as_completed

import numpy as np
import geopandas as gpd
from shapely.ops import unary_union
from shapely.validation import make_valid
from rasterio import (windows, transform, Affine, coords,
                      features as rio_features)

from rex.utilities import log_mem
from rex.utilities.execution import SpawnProcessPool
from reV.handlers.exclusions import ExclusionLayers
from reVX.handlers.geopackage import GPKGMeta
from reVX.utilities.exclusions import AbstractBaseExclusionsMerger
from reVX.setbacks.functions import (parcel_buffer, positive_buffer,
                                     features_clipped_to_county,
                                     features_with_centroid_in_county)

logger = logging.getLogger(__name__)


BUFFERS = {
    "default": positive_buffer,
    "parcel": parcel_buffer,
}
"""Types of buffers available for setback calculations. """


FEATURE_FILTERS = {
    "centroid": features_with_centroid_in_county,
    "clip": features_clipped_to_county,
}
"""Types of feature filters available for setback calculations. """


[docs]class Rasterizer: """Helper class to rasterize shapes.""" def __init__(self, excl_fpath, weights_calculation_upscale_factor, hsds=False): """ Parameters ---------- excl_fpath : str Path to .h5 file containing template layers. The raster will match the shape and profile of these layers. weights_calculation_upscale_factor : int If this value is an int > 1, the output will be a layer with **inclusion** weight values (floats ranging from 0 to 1). Note that this is backwards w.r.t the typical output of exclusion integer values (1 for excluded, 0 otherwise). Values <= 1 will still return a standard exclusion mask. For example, a cell that was previously excluded with a a boolean mask (value of 1) may instead be converted to an inclusion weight value of 0.75, meaning that 75% of the area corresponding to that point should be included (i.e. the exclusion feature only intersected a small portion - 25% - of the cell). This percentage inclusion value is calculated by upscaling the output array using this input value, rasterizing the exclusion features onto it, and counting the number of resulting sub-cells excluded by the feature. For example, setting the value to 3 would split each output cell into nine sub-cells - 3 divisions in each dimension. After the feature is rasterized on this high-resolution sub-grid, the area of the non-excluded sub-cells is totaled and divided by the area of the original cell to obtain the final inclusion percentage. Therefore, a larger upscale factor results in more accurate percentage values. If ``None`` (or a value <= 1), this process is skipped and the output is a boolean exclusion mask. By default ``None``. """ props = _parse_excl_properties(excl_fpath, hsds=hsds) self._shape, self._profile = props self._scale_factor = weights_calculation_upscale_factor or 1 self._scale_factor = int(self._scale_factor // 1) @property def profile(self): """Geotiff profile. Returns ------- dict """ return self._profile @property def transform(self): """rasterio.Affine: Affine transform for exclusion layer. """ return Affine(*self.profile["transform"]) @property def arr_shape(self): """Rasterize array shape. Returns ------- tuple """ return self._shape @property def inclusions(self): """Flag indicating wether or not the output raster represents inclusion values. Returns ------- bool """ return self._scale_factor > 1 def _no_exclusions_array(self, multiplier=1, window=None): """Get an array of the correct shape representing no exclusions. The array contains all zeros, and a new one is created for every function call. Parameters ---------- multiplier : int, optional Integer multiplier value used to scale up the dimensions of the array exclusions array (e.g. multiplier of 3 turns an array of shape (10, 20) into an array of shape (30, 60)). window : :cls:`rasterio.windows.Window` A ``rasterio`` window defining the area of the raster. Can be used to speed up computation and decrease memory requirements if features are localized to a small portion of the raster array. Returns ------- np.array Array of zeros representing no exclusions. """ if window is None: shape = tuple(x * multiplier for x in self.arr_shape[1:]) else: shape = (window.height * multiplier, window.width * multiplier) return np.zeros(shape, dtype='uint8')
[docs] def rasterize(self, shapes, window=None): """Convert geometries into exclusions array. Parameters ---------- shapes : list, optional List of geometries to rasterize (i.e. list(gdf["geometry"])). If `None` or empty list, returns array of zeros. window : :obj:`rasterio.windows.Window` A ``rasterio`` window defining the area of the raster. Can be used to speed up computation and decrease memory requirements if features are localized to a small portion of the raster array. Returns ------- arr : ndarray Rasterized array of shapes. """ shapes = shapes or [] shapes = [(geom, 1) for geom in shapes if geom is not None] if self.inclusions: return self._rasterize_to_weights(shapes, window) return self._rasterize_to_mask(shapes, window)
def _rasterize_to_weights(self, shapes, window): """Rasterize features to weights using a high-resolution array.""" if not shapes: return ((1 - self._no_exclusions_array(window=window)) .astype(np.float32)) hr_arr = self._no_exclusions_array(multiplier=self._scale_factor, window=window) transform = self._window_transform(window) transform *= transform.scale(1 / self._scale_factor) rio_features.rasterize(shapes=shapes, out=hr_arr, fill=0, transform=transform) arr = self._aggregate_high_res(hr_arr, window) return 1 - (arr / self._scale_factor ** 2) def _rasterize_to_mask(self, shapes, window): """Rasterize features with to an exclusion mask.""" arr = self._no_exclusions_array(window=window) if shapes: transform = self._window_transform(window) rio_features.rasterize(shapes=shapes, out=arr, fill=0, transform=transform) return arr def _aggregate_high_res(self, hr_arr, window): """Aggregate the high resolution exclusions array to output shape. """ arr = self._no_exclusions_array(window=window).astype(np.float32) for i, j in product(range(self._scale_factor), range(self._scale_factor)): arr += hr_arr[i::self._scale_factor, j::self._scale_factor] return arr def _window_transform(self, window): """Calculate the transform for a given window, if any. """ if window is None: return deepcopy(self.transform) return windows.transform(window, self.transform)
[docs]class AbstractBaseSetbacks(AbstractBaseExclusionsMerger): """Base class for Setbacks Calculators""" def __init__(self, excl_fpath, regulations, features, hsds=False, weights_calculation_upscale_factor=None): """ Parameters ---------- excl_fpath : str Path to .h5 file containing exclusion layers, will also be the location of any new setback layers regulations : `~reVX.setbacks.regulations.SetbackRegulations` A `SetbackRegulations` object used to extract setback distances. features : str Path to file containing features to compute exclusions from. hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS. By default `False`. weights_calculation_upscale_factor : int, optional If this value is an int > 1, the output will be a layer with **inclusion** weight values instead of exclusion booleans. For example, a cell that was previously excluded with a a boolean mask (value of 1) may instead be converted to an inclusion weight value of 0.75, meaning that 75% of the area corresponding to that point should be included (i.e. the exclusion feature only intersected a small portion - 25% - of the cell). This percentage inclusion value is calculated by upscaling the output array using this input value, rasterizing the exclusion features onto it, and counting the number of resulting sub-cells excluded by the feature. For example, setting the value to `3` would split each output cell into nine sub-cells - 3 divisions in each dimension. After the feature is rasterized on this high-resolution sub-grid, the area of the non-excluded sub-cells is totaled and divided by the area of the original cell to obtain the final inclusion percentage. Therefore, a larger upscale factor results in more accurate percentage values. If `None` (or a value <= 1), this process is skipped and the output is a boolean exclusion mask. By default `None`. """ self._rasterizer = Rasterizer(excl_fpath, weights_calculation_upscale_factor, hsds) super().__init__(excl_fpath, regulations, features, hsds) self._features_meta = GPKGMeta(self._features) def __repr__(self): msg = "{} for {}".format(self.__class__.__name__, self._excl_fpath) return msg @property def description(self): """str: Description to be added to excl H5.""" return ('{} computed with a base setback distance of {} and a ' 'multiplier of {} for a total generic setback value of {} ' '(local exclusions may differ).' .format(self.__class__, self._regulations.base_setback_dist, self._regulations.multiplier, self._regulations.generic)) @property def no_exclusions_array(self): """np.array: Array representing no exclusions. """ return self._rasterizer.rasterize(shapes=None) @property def exclusion_merge_func(self): """callable: Function to merge overlapping exclusion layers. """ return np.minimum if self._rasterizer.inclusions else np.maximum
[docs] def pre_process_regulations(self): """Reduce regulations to state corresponding to features.""" feats_crs = self._features_meta.crs xmin, ymin, xmax, ymax = self._features_meta.bbox regulations_df = self.regulations_table.to_crs(feats_crs) regulations_df = regulations_df.cx[xmin:xmax, ymin:ymax] regulations_df = regulations_df.to_crs(crs=self.profile['crs']) self._regulations.df = regulations_df.reset_index(drop=True) mask = self._regulation_table_mask() if not mask.any(): msg = "Found no local regulations!" logger.warning(msg) warn(msg) self._regulations.df = (self.regulations_table[mask] .reset_index(drop=True)) logger.debug('Loaded and processed setback regulations for {} counties' .format(len(self.regulations_table)))
def _local_exclusions_arguments(self, regulation_value, county): """Compile and return arguments to `compute_local_exclusions`. """ logger.debug("Selecting county IDs using bounds {}" .format(county.total_bounds)) county = (county.buffer(regulation_value * 1.1) .to_crs(self._features_meta.crs)) ids = self._features_meta.feat_ids_for_bbox(county.total_bounds) logger.debug("Calculating setbacks for counties with IDs {}" .format(ids)) for start in range(0, len(ids), self.NUM_FEATURES_PER_WORKER): end = start + self.NUM_FEATURES_PER_WORKER yield (ids[start:end], self._features, self._features_meta.primary_key_column, self.profile['crs'], self.FEATURE_FILTER_TYPE, self.BUFFER_TYPE, self._rasterizer)
[docs] @staticmethod def compute_local_exclusions(regulation_value, county, *args): """Compute local features setbacks. This method will compute the setbacks using a county-specific regulations file that specifies either a static setback or a multiplier value that will be used along with the base setback distance to compute the setback. Parameters ---------- regulation_value : float | int Setback distance in meters. county : geopandas.GeoDataFrame Regulations for a single county. features_ids : iterable of ints List of tuple (or other iterable) of integer values corresponding to the ID of the features in the GeoPackage to load and compute exclusions for. Note that these ID values are the internal SQL table ID's stored with the features, NOT the index of the features when loaded using :func:`geopandas.read_file`. features_fp : path-like Path to the GeoPackage file containing the features to be loaded and used for the exclusion calculation. col : str Namer of the primary key column in the main SQL table of the GeoPackage. This should be the name of the column under which the `features_ids` can be found. crs : str String representation of the Coordinate Reference System of the output exclusions array. features_filter_type : str Key from the :attr:`FEATURE_FILTERS` dictionary that points to the feature filter function to use. This feature filter function filters the loaded features such that they are localized to the county bounds. buffer_type : str Key from the :attr:`BUFFERS` dictionary that points to the feature buffer function to use. rasterizer : Rasterizer Instance of `Rasterizer` class used to rasterize the buffered county features. Returns ------- setbacks : ndarray Raster array of setbacks slices : 2-tuple of `slice` X and Y slice objects defining where in the original array the exclusion data should go. """ (features_ids, features_fp, col, crs, features_filter_type, buffer_type, rasterizer) = args logger.debug('- Computing setbacks for county FIPS {}' .format(county.iloc[0]['FIPS'])) log_mem(logger) features = _load_features(features_ids, features_fp, col, crs) feature_bounds = _buffered_feature_bounds(features, rasterizer, regulation_value) features = FEATURE_FILTERS[features_filter_type](features, county) features = BUFFERS[buffer_type](features, regulation_value) return _rasterize_within_window(features, feature_bounds, rasterizer)
[docs] def compute_generic_exclusions(self, max_workers=None): """Compute generic setbacks. This method will compute the setbacks using a generic setback of `base_setback_dist * multiplier`. Parameters ---------- max_workers : int, optional Number of workers to use for exclusions computation, if 1 run in serial, if > 1 run in parallel with that many workers, if `None` run in parallel on all available cores. By default `None`. Returns ------- setbacks : ndarray Raster array of setbacks """ generic_regulations_dne = (self._regulations.generic is None or np.isclose(self._regulations.generic, 0)) if generic_regulations_dne: return self.no_exclusions_array max_workers = max_workers or os.cpu_count() ids = self._features_meta.feat_ids num_feats = self._features_meta.num_feats pk = self._features_meta.primary_key_column crs = self.profile['crs'] exclusions = None if max_workers > 1: msg = ("Computing generic setbacks from {:,} features using {} " "workers".format(num_feats, max_workers)) logger.debug(msg) loggers = [__name__, 'reVX', 'rex'] spp_kwargs = {"max_workers": max_workers, "loggers": loggers} with SpawnProcessPool(**spp_kwargs) as exe: exclusions = self._compute_generic_exclusions_in_chunks( exe, max_workers, ids, pk, crs) else: logger.info("Computing generic setbacks from {} features in " "serial.".format(num_feats)) for start in range(0, len(ids), self.NUM_FEATURES_PER_WORKER): end = start + self.NUM_FEATURES_PER_WORKER out = _compute_exclusions(ids[start:end], self._features, pk, crs, self.BUFFER_TYPE, self._regulations.generic, self._rasterizer) new_exclusions, slices = out exclusions = self._combine_exclusions(exclusions, new_exclusions, slices=slices) msg = ("Computed generic setbacks for {:,}/{:,} features" .format(end, num_feats)) logger.info(msg) if exclusions is None: exclusions = self.no_exclusions_array return exclusions
def _compute_generic_exclusions_in_chunks(self, exe, max_submissions, ids, pk, crs): """Compute exclusions in parallel using futures. """ futures, exclusions = {}, None futures = [] start_inds = range(0, len(ids), self.NUM_FEATURES_PER_WORKER) for ind, start in enumerate(start_inds, start=1): end = start + self.NUM_FEATURES_PER_WORKER future = exe.submit(_compute_exclusions, ids[start:end], self._features, pk, crs, self.BUFFER_TYPE, self._regulations.generic, self._rasterizer) futures.append(future) if ind % max_submissions == 0: exclusions = self._collect_ge_futures(futures, exclusions) msg = ("Computed generic setbacks for {:,}/{:,} features" .format(end, len(ids))) logger.info(msg) exclusions = self._collect_ge_futures(futures, exclusions) return exclusions def _collect_ge_futures(self, futures, exclusions): """Collect all futures from the input dictionary. """ logger.debug(f"Collecting {len(futures)} futures...") log_mem(logger) for future in as_completed(futures): new_exclusions, slices = future.result() exclusions = self._combine_exclusions(exclusions, new_exclusions, slices=slices) futures.clear() logger.debug("Finished collecting futures chunk!") log_mem(logger) return exclusions def _regulation_table_mask(self): """Return the regulation table mask for setback feature. """ features = (self.regulations_table['Feature Type'] .isin(self.FEATURE_TYPES)) not_excluded = ~(self.regulations_table['Feature Subtype'] .isin(self.FEATURE_SUBTYPES_TO_EXCLUDE)) return features & not_excluded @property @abstractmethod def FEATURE_TYPES(self): """set: Feature type names using in the regulations file. """ raise NotImplementedError @property @abstractmethod def FEATURE_SUBTYPES_TO_EXCLUDE(self): """set: Feature subtype names to exclude from regulations file. """ raise NotImplementedError @property @abstractmethod def BUFFER_TYPE(self): """str: Key in `BUFFERS` pointing to buffer to use. """ raise NotImplementedError @property @abstractmethod def FEATURE_FILTER_TYPE(self): """str: Key in `FEATURE_FILTERS` pointing to feature filter to use. """ raise NotImplementedError @property @abstractmethod def NUM_FEATURES_PER_WORKER(self): """int: Number of features each worker processes at one time. """ raise NotImplementedError
def _parse_excl_properties(excl_fpath, hsds=False): """Parse shape, chunk size, and profile from exclusions file. Parameters ---------- excl_fpath : str Path to .h5 file containing exclusion layers, will also be the location of any new setback layers hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS. By default `False`. Returns ------- shape : tuple Shape of exclusions datasets profile : str GeoTiff profile for exclusions datasets """ with ExclusionLayers(excl_fpath, hsds=hsds) as exc: dset_shape = exc.shape profile = exc.profile if len(dset_shape) < 3: dset_shape = (1, ) + dset_shape logger.debug('Exclusions properties:\n' 'shape : {}\n' 'profile : {}\n' .format(dset_shape, profile)) return dset_shape, profile def _load_features(features_ids, features_fp, col, crs): """Load the `features_ids` from the `features_fp`. """ ids = ",".join(map(str, features_ids)) logger.debug(" Loading {} features from {}".format(len(features_ids), features_fp)) features = gpd.read_file(features_fp, where="{} in ({})".format(col, ids)) features = features.to_crs(crs=crs) features["geometry"] = features.apply(_make_row_shape_valid, axis=1) logger.debug("Loaded {} features".format(len(features))) logger.debug("Features total bounds: {}".format(features.total_bounds)) log_mem(logger) return features def _make_row_shape_valid(row): """Make a row shape valid using shapely `make_valid`""" return unary_union(make_valid(row["geometry"])) def _compute_exclusions(features_ids, features_fp, col, crs, buffer_type, setback, rasterizer): """Compute exclusions by loading features, buffering, and rasterizing. """ setbacks, feature_bounds = _load_and_buffer(features_ids, features_fp, col, crs, buffer_type, setback, rasterizer) if setbacks is None: return None, None return _rasterize_within_window(setbacks, feature_bounds, rasterizer) def _load_and_buffer(features_ids, features_fp, col, crs, buffer_type, setback, rasterizer): """Load features and immediately buffer them. The intention is to keep loading and buffering in one function so that large sets of features get dropped immediately instead of hanging in memory during rasterization. """ features = _load_features(features_ids, features_fp, col, crs) excl_array_bbox = transform.array_bounds(*rasterizer.arr_shape[1:], rasterizer.transform) if coords.disjoint_bounds(excl_array_bbox, features.total_bounds): return None, None logger.debug(f"Buffering {len(features)} features...") setbacks = BUFFERS[buffer_type](features, setback) feature_bounds = _buffered_feature_bounds(features, rasterizer, setback) return setbacks, feature_bounds def _buffered_feature_bounds(features, rasterizer, regulation_value): """Calculate the buffered feature bounds""" buffer_len = max(abs(rasterizer.transform.a), abs(rasterizer.transform.e)) bound_buffer = regulation_value * 2 + buffer_len return features.buffer(bound_buffer).total_bounds def _rasterize_within_window(features, bounds, rasterizer): """Rasterize the features using the geoseries bounding box as a window. """ window = _cropped_window(bounds, rasterizer.transform, rasterizer.arr_shape[1:]) logger.debug(f"Rasterizing {len(features)} features using {window!r}") exclusions = rasterizer.rasterize(features, window=window) log_mem(logger) logger.debug(f"Exclusion mem size: {exclusions.nbytes / 1048576:.2f}MB") return exclusions, window.toslices() def _cropped_window(bounds, raster_transform, shape): """Calculate the raster array window corresponding to the bounding box.""" left, bottom, right, top = bounds rows, cols = transform.rowcol(raster_transform, [left, right, right, left], [top, top, bottom, bottom], op=float) row_start = max(floor(min(rows)), 0) col_start = max(floor(min(cols)), 0) row_stop = min(ceil(max(rows)), shape[0]) col_stop = min(ceil(max(cols)), shape[1]) return windows.Window(col_off=col_start, row_off=row_start, width=max(col_stop - col_start, 0), height=max(row_stop - row_start, 0))