Source code for reVX.utilities.exclusions

# -*- coding: utf-8 -*-
"""
Driver class to compute exclusions
"""
import h5py
import json
import os
import logging
from abc import ABC, abstractmethod
from concurrent.futures import as_completed
from warnings import warn

import numpy as np
import geopandas as gpd
import pandas as pd
from pyproj.crs import CRS
import rasterio
from rasterio import features as rio_features
from shapely.geometry import shape

from rex import Outputs
from rex.utilities import SpawnProcessPool, log_mem
from reV.handlers.exclusions import ExclusionLayers
from reVX.handlers.geotiff import Geotiff
from reVX.handlers.layered_h5 import LayeredH5
from reVX.utilities.utilities import log_versions

logger = logging.getLogger(__name__)


[docs]class AbstractExclusionCalculatorInterface(ABC): """Abstract Exclusion Calculator Interface. """ @property @abstractmethod def no_exclusions_array(self): """np.array: Array representing no exclusions. """ raise NotImplementedError @property @abstractmethod def exclusion_merge_func(self): """callable: Function to merge overlapping exclusion layers. """ raise NotImplementedError
[docs] @abstractmethod def pre_process_regulations(self): """Reduce regulations to correct state and features. When implementing this method, make sure to update `self._regulations.df`. """ raise NotImplementedError
@abstractmethod def _local_exclusions_arguments(self, regulation_value, county): """Compile and yield arguments to `compute_local_exclusions`. This method should yield lists or tuples of extra args to be passed to `compute_local_exclusions`. Do not include the `regulation_value` or `county`. Parameters ---------- regulation_value : float | int Regulation value for county. county : geopandas.GeoDataFrame Regulations for a single county. """ raise NotImplementedError
[docs] @staticmethod @abstractmethod def compute_local_exclusions(regulation_value, county, *args): """Compute local feature exclusions. This method should compute the exclusions using the information about the input county. Parameters ---------- regulation_value : float | int Regulation value for county. county : geopandas.GeoDataFrame Regulations for a single county. *args Other arguments required for local exclusion calculation. Returns ------- exclusions : np.ndarray Array of exclusions. slices : 2-tuple of `slice` X and Y slice objects defining where in the original array the exclusion data should go. """ raise NotImplementedError
[docs] @abstractmethod def compute_generic_exclusions(self, max_workers=None): """Compute generic exclusions. This method should compute the exclusions using a generic regulation value (`self._regulations.generic`). Parameters ---------- max_workers : int, optional Number of workers to use for exclusions computation, if 1 run in serial, if > 1 run in parallel with that many workers, if `None` run in parallel on all available cores. By default `None`. Returns ------- exclusions : ndarray Raster array of exclusions """ raise NotImplementedError
[docs]class AbstractBaseExclusionsMerger(AbstractExclusionCalculatorInterface): """ Create exclusions layers for exclusions """ def __init__(self, excl_fpath, regulations, features, hsds=False): """ Parameters ---------- excl_fpath : str Path to .h5 file containing exclusion layers, will also be the location of any new exclusion layers regulations : `~reVX.utilities.AbstractBaseRegulations` subclass A regulations object used to extract exclusion regulation values. features : str Path to file containing features to compute exclusions from. hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS. By default `False`. """ log_versions(logger) self._excl_fpath = excl_fpath self._regulations = regulations self._features = features self._hsds = hsds self._fips = self._profile = None self._set_profile() self._process_regulations(regulations.df) def __repr__(self): msg = "{} for {}".format(self.__class__.__name__, self._excl_fpath) return msg def _set_profile(self): """Extract profile from excl h5.""" with ExclusionLayers(self._excl_fpath, hsds=self._hsds) as f: self._profile = f.profile def _process_regulations(self, regulations_df): """Parse the county regulations. Parse regulations, combine with county geometries from exclusions .h5 file. The county geometries are intersected with features to compute county specific exclusions. Parameters ---------- regulations : pandas.DataFrame Regulations table Returns ------- regulations: `geopandas.GeoDataFrame` GeoDataFrame with county level exclusion regulations merged with county geometries, use for intersecting with exclusion features. """ if regulations_df is None: return with ExclusionLayers(self._excl_fpath, hsds=self._hsds) as exc: self._fips = exc['cnty_fips'] cnty_fips_profile = exc.get_layer_profile('cnty_fips') if 'FIPS' not in regulations_df: msg = ('Regulations does not have county FIPS! Please add a ' '"FIPS" columns with the unique county FIPS values.') logger.error(msg) raise RuntimeError(msg) if 'geometry' not in regulations_df: regulations_df['geometry'] = None regulations_df = regulations_df[~regulations_df['FIPS'].isna()] regulations_df = regulations_df.set_index('FIPS') logger.info('Merging county geometries w/ local regulations') shapes_from_raster = rio_features.shapes( self._fips.astype(np.int32), transform=cnty_fips_profile['transform'] ) county_regs = [] for polygon, fips_code in shapes_from_raster: fips_code = int(fips_code) if fips_code in regulations_df.index: local_regs = regulations_df.loc[[fips_code]].copy() local_regs['geometry'] = shape(polygon) county_regs.append(local_regs) if county_regs: regulations_df = pd.concat(county_regs) regulations_df = gpd.GeoDataFrame( regulations_df, crs=self.profile['crs'], geometry='geometry' ) regulations_df = regulations_df.reset_index() regulations_df = regulations_df.to_crs(crs=self.profile['crs']) self._regulations.df = regulations_df @property def profile(self): """dict: Geotiff profile. """ return self._profile @property def regulations_table(self): """Regulations table. Returns ------- geopandas.GeoDataFrame | None """ return self._regulations.df @regulations_table.setter def regulations_table(self, regulations_table): self._process_regulations(regulations_table) def _write_exclusions(self, geotiff, exclusions, replace=False): """ Write exclusions to geotiff, replace if requested Parameters ---------- geotiff : str Path to geotiff file to save exclusions too exclusions : ndarray Rasterized array of exclusions. replace : bool, optional Flag to replace local layer data with arr if file already exists on disk. By default `False`. """ if os.path.exists(geotiff): _error_or_warn(geotiff, replace) Geotiff.write(geotiff, self.profile, exclusions) def _write_layer(self, out_layer, exclusions, replace=False): """Write exclusions to H5, replace if requested Parameters ---------- out_layer : str Name of new exclusion layer to add to h5. exclusions : ndarray Rasterized array of exclusions. replace : bool, optional Flag to replace local layer data with arr if layer already exists in the exclusion .h5 file. By default `False`. """ with ExclusionLayers(self._excl_fpath, hsds=self._hsds) as exc: layers = exc.layers if out_layer in layers: _error_or_warn(out_layer, replace) try: description = self.description except AttributeError: description = None LayeredH5(self._excl_fpath).write_layer_to_h5(exclusions, out_layer, self.profile, description=description) def _county_exclusions(self): """Yield county exclusion arguments. """ for ind, regulation_info in enumerate(self._regulations, start=1): exclusion, cnty = regulation_info logger.debug('Computing exclusions for {}/{} counties' .format(ind, len(self.regulations_table))) for args in self._local_exclusions_arguments(exclusion, cnty): yield exclusion, cnty, args
[docs] def compute_all_local_exclusions(self, max_workers=None): """Compute local exclusions for all counties either. Parameters ---------- max_workers : int, optional Number of workers to use for exclusions computation, if 1 run in serial, if > 1 run in parallel with that many workers, if `None` run in parallel on all available cores. By default `None`. Returns ------- exclusions : ndarray Raster array of exclusions. """ mw = max_workers or os.cpu_count() log_mem(logger) exclusions = None if mw > 1: logger.info('Computing local exclusions in parallel using {} ' 'workers'.format(mw)) spp_kwargs = {"max_workers": mw, "loggers": [__name__, 'reVX']} with SpawnProcessPool(**spp_kwargs) as exe: exclusions = self._compute_local_exclusions_in_chunks(exe, mw) else: logger.info('Computing local exclusions in serial') for ind, cnty_inf in enumerate(self._county_exclusions(), start=1): exclusion_value, cnty, args = cnty_inf out = self.compute_local_exclusions(exclusion_value, cnty, *args) local_exclusions, slices = out fips = cnty['FIPS'].unique() exclusions = self._combine_exclusions(exclusions, local_exclusions, fips, slices) logger.debug("Computed exclusions for {:,} counties" .format(ind)) if exclusions is None: exclusions = self.no_exclusions_array return exclusions
def _compute_local_exclusions_in_chunks(self, exe, max_submissions): """Compute exclusions in parallel using futures. """ futures, exclusions = {}, None for ind, reg in enumerate(self._county_exclusions(), start=1): exclusion_value, cnty, args = reg future = exe.submit(self.compute_local_exclusions, exclusion_value, cnty, *args) futures[future] = cnty['FIPS'].unique() if ind % max_submissions == 0: exclusions = self._collect_local_futures(futures, exclusions) exclusions = self._collect_local_futures(futures, exclusions) return exclusions def _collect_local_futures(self, futures, exclusions): """Collect all futures from the input dictionary. """ for future in as_completed(futures): new_exclusions, slices = future.result() exclusions = self._combine_exclusions(exclusions, new_exclusions, futures.pop(future), slices=slices) log_mem(logger) return exclusions
[docs] def compute_exclusions(self, out_layer=None, out_tiff=None, replace=False, max_workers=None): """ Compute exclusions for all states either in serial or parallel. Existing exclusions are computed if a regulations file was supplied during class initialization, otherwise generic exclusions are computed. Parameters ---------- out_layer : str, optional Name to save rasterized exclusions under in .h5 file. If `None`, exclusions will not be written to the .h5 file. By default `None`. out_tiff : str, optional Path to save geotiff containing rasterized exclusions. If `None`, exclusions will not be written to a geotiff file. By default `None`. replace : bool, optional Flag to replace geotiff if it already exists. By default `False`. max_workers : int, optional Number of workers to use for exclusion computation, if 1 run in serial, if > 1 run in parallel with that many workers, if `None`, run in parallel on all available cores. By default `None`. Returns ------- exclusions : ndarray Raster array of exclusions """ exclusions = self._compute_merged_exclusions(max_workers=max_workers) if out_layer is not None: logger.info('Saving exclusion layer to {} as {}' .format(self._excl_fpath, out_layer)) self._write_layer(out_layer, exclusions, replace=replace) if out_tiff is not None: logger.debug('Writing exclusions to {}'.format(out_tiff)) self._write_exclusions(out_tiff, exclusions, replace=replace) return exclusions
def _compute_merged_exclusions(self, max_workers=None): """Compute and merge local and generic exclusions, if necessary. """ mw = max_workers if self._regulations.locals_exist: self.pre_process_regulations() generic_exclusions_exist = self._regulations.generic_exists local_exclusions_exist = self._regulations.locals_exist if not generic_exclusions_exist and not local_exclusions_exist: msg = ("Found no exclusions to compute: No regulations detected, " "and generic multiplier not set.") logger.warning(msg) warn(msg) return self.no_exclusions_array if generic_exclusions_exist and not local_exclusions_exist: return self.compute_generic_exclusions(max_workers=mw) if local_exclusions_exist and not generic_exclusions_exist: local_excl = self.compute_all_local_exclusions(max_workers=mw) # merge ensures local exclusions are clipped county boundaries return self._merge_exclusions(None, local_excl) generic_exclusions = self.compute_generic_exclusions(max_workers=mw) local_exclusions = self.compute_all_local_exclusions(max_workers=mw) return self._merge_exclusions(generic_exclusions, local_exclusions) def _merge_exclusions(self, generic_exclusions, local_exclusions): """Merge local exclusions onto the generic exclusions.""" logger.info('Merging local exclusions onto the generic exclusions') local_fips = self.regulations_table["FIPS"].unique() return self._combine_exclusions(generic_exclusions, local_exclusions, local_fips, replace_existing=True) def _combine_exclusions(self, existing, additional=None, cnty_fips=None, slices=None, replace_existing=False): """Combine local exclusions using FIPS code""" if additional is None: return existing if existing is None: existing = self.no_exclusions_array.astype(additional.dtype) if slices is None: slices = tuple([slice(None)] * len(existing.shape)) if cnty_fips is None: local_exclusions = slice(None) else: local_exclusions = np.isin(self._fips[slices], cnty_fips) if replace_existing: new_local_exclusions = additional[local_exclusions] else: new_local_exclusions = self.exclusion_merge_func( existing[slices][local_exclusions], additional[local_exclusions]) existing[slices][local_exclusions] = new_local_exclusions return existing
[docs] @classmethod def run(cls, excl_fpath, features_path, out_fn, regulations, max_workers=None, replace=False, out_layers=None, hsds=False, **kwargs): """ Compute exclusions and write them to a geotiff. If a regulations file is given, compute local exclusions, otherwise compute generic exclusions. If both are provided, generic and local exclusions are merged such that the local exclusions override the generic ones. Parameters ---------- excl_fpath : str Path to .h5 file containing exclusion layers, will also be the location of any new exclusion layers. features_path : str Path to file or directory feature shape files. This path can contain any pattern that can be used in the glob function. For example, `/path/to/features/[A]*` would match with all the features in the directory `/path/to/features/` that start with "A". This input can also be a directory, but that directory must ONLY contain feature files. If your feature files are mixed with other files or directories, use something like `/path/to/features/*.geojson`. out_fn : str Path to output geotiff where exclusion data should be stored. regulations : `~reVX.utilities.AbstractBaseRegulations` subclass A regulations object used to extract exclusion regulation distances. max_workers : int, optional Number of workers to use for exclusion computation, if 1 run in serial, if > 1 run in parallel with that many workers, if `None`, run in parallel on all available cores. By default `None`. replace : bool, optional Flag to replace geotiff if it already exists. By default `False`. out_layers : dict, optional Dictionary mapping feature file names (with extension) to names of layers under which exclusions should be saved in the `excl_fpath` .h5 file. If `None` or empty dictionary, no layers are saved to the h5 file. By default `None`. hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS. By default `False`. **kwargs Keyword args to exclusions calculator class. """ out_layers = out_layers or {} cls_init_kwargs = {"excl_fpath": excl_fpath, "regulations": regulations} cls_init_kwargs.update(kwargs) if os.path.exists(out_fn) and not replace: msg = ('{} already exists, exclusions will not be re-computed ' 'unless replace=True'.format(out_fn)) logger.error(msg) else: logger.info("Computing exclusions from {} and saving " "to {}".format(features_path, out_fn)) out_layer = out_layers.get(os.path.basename(features_path)) exclusions = cls(excl_fpath=excl_fpath, regulations=regulations, features=features_path, hsds=hsds, **kwargs) exclusions.compute_exclusions(out_tiff=out_fn, out_layer=out_layer, max_workers=max_workers, replace=replace)
def _error_or_warn(name, replace): """If replace, throw warning, otherwise throw error. """ if not replace: msg = ('{} already exists. To replace it set "replace=True"' .format(name)) logger.error(msg) raise IOError(msg) msg = ('{} already exists and will be replaced!'.format(name)) logger.warning(msg) warn(msg)