# -*- coding: utf-8 -*-
"""reV tech mapping framework.
This module manages the exclusions-to-resource mapping.
The core of this module is a parallel cKDTree.
Created on Fri Jun 21 16:05:47 2019
@author: gbuster
"""
import logging
import os
from concurrent.futures import as_completed
from math import ceil
from warnings import warn
import h5py
import numpy as np
from rex.resource import Resource
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.utilities import res_dist_threshold
from scipy.spatial import cKDTree
from reV.supply_curve.extent import SupplyCurveExtent, LATITUDE, LONGITUDE
from reV.utilities.exceptions import FileInputError, FileInputWarning
logger = logging.getLogger(__name__)
[docs]class TechMapping:
"""Framework to create map between tech layer (exclusions), res, and gen"""
def __init__(
self, excl_fpath, res_fpath, sc_resolution=2560, dist_margin=1.05
):
"""
Parameters
----------
excl_fpath : str
Filepath to exclusions h5 file, must contain latitude and longitude
arrays to allow for mapping to resource points
res_fpath : str
Filepath to .h5 resource file that we're mapping to.
sc_resolution : int | None, optional
Supply curve resolution, does not affect the exclusion to resource
(tech) mapping, but defines how many exclusion pixels are mapped
at a time, by default 2560
dist_margin : float, optional
Extra margin to multiply times the computed distance between
neighboring resource points, by default 1.05
"""
self._excl_fpath = excl_fpath
self._check_fout()
self._tree, self._dist_thresh = self._build_tree(
res_fpath, dist_margin=dist_margin
)
with SupplyCurveExtent(
self._excl_fpath, resolution=sc_resolution
) as sc:
self._sc_resolution = sc.resolution
self._gids = np.array(list(range(len(sc))), dtype=np.uint32)
self._excl_shape = sc.exclusions.shape
self._n_excl = np.product(self._excl_shape)
self._sc_row_indices = sc.row_indices
self._sc_col_indices = sc.col_indices
self._excl_row_slices = sc.excl_row_slices
self._excl_col_slices = sc.excl_col_slices
logger.info(
"Initialized TechMapping object with {} calc chunks "
"for {} tech exclusion points".format(
len(self._gids), self._n_excl
)
)
@property
def distance_threshold(self):
"""Get the upper bound on NN distance between excl and res points.
Returns
-------
float
Estimate the distance between resource points. Calculated as half
of the diagonal between closest resource points, with desired
extra margin
"""
return self._dist_thresh
@staticmethod
def _build_tree(res_fpath, dist_margin=1.05):
"""
Build cKDTree from resource lat, lon coordinates. Compute minimum
intra point distance between resource gids with provided extra margin.
Parameters
----------
res_fpath : str
Filepath to .h5 resource file that we're mapping to.
dist_margin : float, optional
Extra margin to multiply times the computed distance between
neighboring resource points, by default 1.05
Returns
-------
tree : cKDTree
cKDTree built from resource lat, lon coordinates
dist_tresh : float
Estimate the distance between resource points. Calculated as half
of the diagonal between closest resource points, with desired
extra margin
"""
with Resource(res_fpath) as f:
lat_lons = f.lat_lon
# pylint: disable=not-callable
tree = cKDTree(lat_lons)
dist_thresh = res_dist_threshold(
lat_lons, tree=tree, margin=dist_margin
)
return tree, dist_thresh
@staticmethod
def _make_excl_iarr(shape):
"""
Create 2D array of 1D index values for the flattened h5 excl extent
Parameters
----------
shape : tuple
exclusion extent shape
Returns
-------
iarr : ndarray
2D array of 1D index values for the flattened h5 excl extent
"""
iarr = np.arange(np.product(shape), dtype=np.uint32)
return iarr.reshape(shape)
@staticmethod
def _get_excl_slices(
gid, sc_row_indices, sc_col_indices, excl_row_slices, excl_col_slices
):
"""
Get the row and column slices of the exclusions grid corresponding
to the supply curve point gid.
Parameters
----------
gid : int
Supply curve point gid.
sc_row_indices : list
List of row indices in exclusion array for for every sc_point gid
sc_col_indices : list
List of column indices in exclusion array for for every sc_point
gid
excl_row_slices : list
List representing the supply curve points rows. Each list entry
contains the exclusion row slice that are included in the sc
point.
excl_col_slices : list
List representing the supply curve points columns. Each list entry
contains the exclusion columns slice that are included in the sc
point.
Returns
-------
row_slice : int
Exclusions grid row index slice corresponding to the sc point gid.
col_slice : int
Exclusions grid col index slice corresponding to the sc point gid.
"""
row_slice = excl_row_slices[sc_row_indices[gid]]
col_slice = excl_col_slices[sc_col_indices[gid]]
return row_slice, col_slice
@classmethod
def _get_excl_coords(cls, excl_fpath, gids, sc_row_indices, sc_col_indices,
excl_row_slices, excl_col_slices,
coord_labels=(LATITUDE, LONGITUDE)):
"""
Extract the exclusion coordinates for the desired gids for TechMapping.
Parameters
----------
gids : np.ndarray
Supply curve gids with tech exclusion points to map to the
resource meta points.
excl_fpath : str
Filepath to exclusions h5 file, must contain latitude and longitude
arrays to allow for mapping to resource points
sc_row_indices : list
List of row indices in exclusion array for for every sc_point gid
sc_col_indices : list
List of column indices in exclusion array for for every sc_point
gid
excl_row_slices : list
List representing the supply curve points rows. Each list entry
contains the exclusion row slice that are included in the sc
point.
excl_col_slices : list
List representing the supply curve points columns. Each list entry
contains the exclusion columns slice that are included in the sc
point.
coord_labels : tuple
Labels for the coordinate datasets.
Returns
-------
coords_out : list
List of arrays of the un-projected latitude, longitude array of
tech exclusion points. List entries correspond to input gids.
"""
coords_out = []
with h5py.File(excl_fpath, "r") as f:
for gid in gids:
row_slice, col_slice = cls._get_excl_slices(
gid,
sc_row_indices,
sc_col_indices,
excl_row_slices,
excl_col_slices,
)
try:
lats = f[coord_labels[0]][row_slice, col_slice]
lons = f[coord_labels[1]][row_slice, col_slice]
emeta = np.vstack((lats.flatten(), lons.flatten())).T
except Exception as e:
m = (
"Could not unpack coordinates for gid {} with "
"row/col slice {}/{}. Received the following "
"error:\n{}".format(gid, row_slice, col_slice, e)
)
logger.error(m)
raise e
coords_out.append(emeta)
return coords_out
[docs] @classmethod
def map_resource_gids(
cls,
gids,
excl_fpath,
sc_row_indices,
sc_col_indices,
excl_row_slices,
excl_col_slices,
tree,
dist_thresh,
):
"""Map exclusion gids to the resource meta.
Parameters
----------
gids : np.ndarray
Supply curve gids with tech exclusion points to map to the
resource meta points.
excl_fpath : str
Filepath to exclusions h5 file, must contain latitude and longitude
arrays to allow for mapping to resource points
sc_row_indices : list
List of row indices in exclusion array for for every sc_point gid
sc_col_indices : list
List of column indices in exclusion array for for every sc_point
gid
excl_row_slices : list
List representing the supply curve points rows. Each list entry
contains the exclusion row slice that are included in the sc
point.
excl_col_slices : list
List representing the supply curve points columns. Each list entry
contains the exclusion columns slice that are included in the sc
point.
tree : cKDTree
cKDTree built from resource lat, lon coordinates
dist_tresh : float
Estimate the distance between resource points. Calculated as half
of the diagonal between closest resource points, with an extra
5% margin
Returns
-------
ind : list
List of arrays of index values from the NN. List entries correspond
to input gids.
"""
logger.debug(
"Getting tech map coordinates for chunks {} through {}".format(
gids[0], gids[-1]
)
)
ind_out = []
coords_out = cls._get_excl_coords(
excl_fpath,
gids,
sc_row_indices,
sc_col_indices,
excl_row_slices,
excl_col_slices,
)
logger.debug(
"Running tech mapping for chunks {} through {}".format(
gids[0], gids[-1]
)
)
for i, _ in enumerate(gids):
dist, ind = tree.query(coords_out[i])
ind[(dist >= dist_thresh)] = -1
ind_out.append(ind)
return ind_out
[docs] @staticmethod
def save_tech_map(
excl_fpath,
dset,
indices,
distance_threshold=None,
res_fpath=None,
chunks=(128, 128),
):
"""Save tech mapping indices and coordinates to an h5 output file.
Parameters
----------
excl_fpath : str
Filepath to exclusions h5 file to add techmap to as 'dset'
dset : str
Dataset name in fpath_out to save mapping results to.
indices : np.ndarray
Index values of the NN resource point. -1 if no res point found.
2D integer array with shape equal to the exclusions extent shape.
distance_threshold : float
Distance upper bound to save as attr.
res_fpath : str, optional
Filepath to .h5 resource file that we're mapping to,
by default None
chunks : tuple
Chunk shape of the 2D output datasets.
"""
logger.info('Writing tech map "{}" to {}'.format(dset, excl_fpath))
shape = indices.shape
chunks = (np.min((shape[0], chunks[0])), np.min((shape[1], chunks[1])))
with h5py.File(excl_fpath, "a") as f:
if dset in list(f):
wmsg = (
'TechMap results dataset "{}" is being replaced '
'in pre-existing Exclusions TechMapping file "{}"'.format(
dset, excl_fpath
)
)
logger.warning(wmsg)
warn(wmsg, FileInputWarning)
f[dset][...] = indices
else:
f.create_dataset(
dset,
shape=shape,
dtype=indices.dtype,
data=indices,
chunks=chunks,
)
if distance_threshold:
f[dset].attrs["distance_threshold"] = distance_threshold
if res_fpath:
f[dset].attrs["src_res_fpath"] = res_fpath
logger.info(
'Successfully saved tech map "{}" to {}'.format(dset, excl_fpath)
)
def _check_fout(self):
"""Check the TechMapping output file for cached data."""
with h5py.File(self._excl_fpath, 'r') as f:
if LATITUDE not in f or LONGITUDE not in f:
emsg = ('Datasets "latitude" and/or "longitude" not in '
'pre-existing Exclusions TechMapping file "{}". '
'Cannot proceed.'
.format(os.path.basename(self._excl_fpath)))
logger.exception(emsg)
raise FileInputError(emsg)
[docs] def map_resource(self, max_workers=None, points_per_worker=10):
"""
Map all resource gids to exclusion gids
Parameters
----------
max_workers : int, optional
Number of cores to run mapping on. None uses all available cpus,
by default None
points_per_worker : int, optional
Number of supply curve points to map to resource gids on each
worker, by default 10
Returns
-------
indices : np.ndarray
Index values of the NN resource point. -1 if no res point found.
2D integer array with shape equal to the exclusions extent shape.
"""
gid_chunks = ceil(len(self._gids) / points_per_worker)
gid_chunks = np.array_split(self._gids, gid_chunks)
# init full output arrays
indices = -1 * np.ones((self._n_excl,), dtype=np.int32)
iarr = self._make_excl_iarr(self._excl_shape)
futures = {}
loggers = [__name__, "reV"]
with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe:
# iterate through split executions, submitting each to worker
for i, gid_set in enumerate(gid_chunks):
# submit executions and append to futures list
futures[
exe.submit(
self.map_resource_gids,
gid_set,
self._excl_fpath,
self._sc_row_indices,
self._sc_col_indices,
self._excl_row_slices,
self._excl_col_slices,
self._tree,
self.distance_threshold,
)
] = i
n_finished = 0
for future in as_completed(futures):
n_finished += 1
logger.info(
"Parallel TechMapping futures collected: "
"{} out of {}".format(n_finished, len(futures))
)
i = futures[future]
result = future.result()
for j, gid in enumerate(gid_chunks[i]):
row_slice, col_slice = self._get_excl_slices(
gid,
self._sc_row_indices,
self._sc_col_indices,
self._excl_row_slices,
self._excl_col_slices,
)
ind_slice = iarr[row_slice, col_slice].flatten()
indices[ind_slice] = result[j]
indices = indices.reshape(self._excl_shape)
return indices
[docs] @classmethod
def run(
cls,
excl_fpath,
res_fpath,
dset=None,
sc_resolution=2560,
dist_margin=1.05,
max_workers=None,
points_per_worker=10,
):
"""Run parallel mapping and save to h5 file.
Parameters
----------
excl_fpath : str
Filepath to exclusions h5 (tech layer). dset will be
created in excl_fpath.
res_fpath : str
Filepath to .h5 resource file that we're mapping to.
dset : str, optional
Dataset name in excl_fpath to save mapping results to, if None
do not save tech_map to excl_fpath, by default None
sc_resolution : int | None, optional
Supply curve resolution, does not affect the exclusion to resource
(tech) mapping, but defines how many exclusion pixels are mapped
at a time, by default 2560
dist_margin : float, optional
Extra margin to multiply times the computed distance between
neighboring resource points, by default 1.05
max_workers : int, optional
Number of cores to run mapping on. None uses all available cpus,
by default None
points_per_worker : int, optional
Number of supply curve points to map to resource gids on each
worker, by default 10
Returns
-------
indices : np.ndarray
Index values of the NN resource point. -1 if no res point found.
2D integer array with shape equal to the exclusions extent shape.
"""
kwargs = {"dist_margin": dist_margin, "sc_resolution": sc_resolution}
mapper = cls(excl_fpath, res_fpath, **kwargs)
indices = mapper.map_resource(
max_workers=max_workers, points_per_worker=points_per_worker
)
if dset:
mapper.save_tech_map(
excl_fpath,
dset,
indices,
distance_threshold=mapper.distance_threshold,
res_fpath=res_fpath,
)
return indices