Source code for reV.supply_curve.tech_mapping

# -*- coding: utf-8 -*-
"""reV tech mapping framework.

This module manages the exclusions-to-resource mapping.
The core of this module is a parallel cKDTree.

Created on Fri Jun 21 16:05:47 2019

@author: gbuster
"""
import logging
import os
from concurrent.futures import as_completed
from math import ceil
from warnings import warn

import h5py
import numpy as np
from scipy.spatial import cKDTree
from rex.resource import Resource
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.utilities import check_res_file, res_dist_threshold

from reV.supply_curve.extent import SupplyCurveExtent, LATITUDE, LONGITUDE
from reV.utilities.exceptions import FileInputError, FileInputWarning

logger = logging.getLogger(__name__)


[docs] class TechMapping: """Framework to create map between tech layer (exclusions), res, and gen""" def __init__(self, excl_fpath, sc_resolution=1200): """ Parameters ---------- excl_fpath : str Filepath to exclusions h5 file, must contain latitude and longitude arrays to allow for mapping to resource points sc_resolution : int | None, optional Defines how many exclusion pixels are mapped at a time. Units indicate the length of one dimension, in pixels, of each square chunk to be mapped. By default, this value is 1200, which will map the exclusion pixels in 1200x1200 pixel chunks. .. Note:: This parameter does not affect the exclusion to resource (tech) mapping, which deviates from how the effect of the ``sc_resolution`` parameter works in other functionality within ``reV``. """ self._excl_fpath = excl_fpath self._check_fout() with SupplyCurveExtent( self._excl_fpath, resolution=sc_resolution ) as sc: self._gids = np.array(list(range(len(sc))), dtype=np.uint32) self._excl_shape = sc.exclusions.shape self._n_excl = np.prod(self._excl_shape) self._sc_row_indices = sc.row_indices self._sc_col_indices = sc.col_indices self._excl_row_slices = sc.excl_row_slices self._excl_col_slices = sc.excl_col_slices logger.info( "Initialized TechMapping object with {} calc chunks " "for {} tech exclusion points".format( len(self._gids), self._n_excl ) ) @staticmethod def _build_tree(res_fpath, dist_margin=1.05): """ Build cKDTree from resource lat, lon coordinates. Compute minimum intra point distance between resource gids with provided extra margin. Parameters ---------- res_fpath : str Filepath to .h5 resource file that we're mapping to. dist_margin : float, optional Extra margin to multiply times the computed distance between neighboring resource points, by default 1.05 Returns ------- tree : cKDTree cKDTree built from resource lat, lon coordinates dist_tresh : float Estimate the distance between resource points. Calculated as half of the diagonal between closest resource points, with desired extra margin """ __, hsds = check_res_file(res_fpath) with Resource(res_fpath, hsds=hsds) as f: lat_lons = f.lat_lon # pylint: disable=not-callable tree = cKDTree(lat_lons) dist_thresh = res_dist_threshold( lat_lons, tree=tree, margin=dist_margin ) return tree, dist_thresh @staticmethod def _make_excl_iarr(shape): """ Create 2D array of 1D index values for the flattened h5 excl extent Parameters ---------- shape : tuple exclusion extent shape Returns ------- iarr : ndarray 2D array of 1D index values for the flattened h5 excl extent """ iarr = np.arange(np.prod(shape), dtype=np.uint32) return iarr.reshape(shape) @staticmethod def _get_excl_slices( gid, sc_row_indices, sc_col_indices, excl_row_slices, excl_col_slices ): """ Get the row and column slices of the exclusions grid corresponding to the supply curve point gid. Parameters ---------- gid : int Supply curve point gid. sc_row_indices : list List of row indices in exclusion array for for every sc_point gid sc_col_indices : list List of column indices in exclusion array for for every sc_point gid excl_row_slices : list List representing the supply curve points rows. Each list entry contains the exclusion row slice that are included in the sc point. excl_col_slices : list List representing the supply curve points columns. Each list entry contains the exclusion columns slice that are included in the sc point. Returns ------- row_slice : int Exclusions grid row index slice corresponding to the sc point gid. col_slice : int Exclusions grid col index slice corresponding to the sc point gid. """ row_slice = excl_row_slices[sc_row_indices[gid]] col_slice = excl_col_slices[sc_col_indices[gid]] return row_slice, col_slice @classmethod def _get_excl_coords(cls, excl_fpath, gid, sc_row_indices, sc_col_indices, excl_row_slices, excl_col_slices, coord_labels=(LATITUDE, LONGITUDE)): """ Extract the exclusion coordinates for the desired supply curve point gid for TechMapping. Parameters ---------- gid : int Supply curve gid with tech exclusion points to map to the resource meta points. excl_fpath : str Filepath to exclusions h5 file, must contain latitude and longitude arrays to allow for mapping to resource points sc_row_indices : list List of row indices in exclusion array for for every sc_point gid sc_col_indices : list List of column indices in exclusion array for for every sc_point gid excl_row_slices : list List representing the supply curve points rows. Each list entry contains the exclusion row slice that are included in the sc point. excl_col_slices : list List representing the supply curve points columns. Each list entry contains the exclusion columns slice that are included in the sc point. coord_labels : tuple Labels for the coordinate datasets. Returns ------- coords_out : ndarray 2D array (Nx2) of the un-projected latitude, longitude of tech exclusion pixels within the specified gid point. Rows correspond to exclusion pixels of the specified gid, columns correspond to latitude and longitude, respectively. """ with h5py.File(excl_fpath, "r") as f: row_slice, col_slice = cls._get_excl_slices( gid, sc_row_indices, sc_col_indices, excl_row_slices, excl_col_slices, ) try: lats = f[coord_labels[0]][row_slice, col_slice] lons = f[coord_labels[1]][row_slice, col_slice] coords_out = np.vstack((lats.flatten(), lons.flatten())).T except Exception as e: m = ( "Could not unpack coordinates for gid {} with " "row/col slice {}/{}. Received the following " "error:\n{}".format(gid, row_slice, col_slice, e) ) logger.error(m) raise e return coords_out
[docs] @classmethod def map_resource_gids( cls, excl_coords, tree, dist_thresh, ): """Map exclusion pixels to the resource meta. Parameters ---------- excl_coords : ndarray 2D Array of the un-projected latitude, longitude array of tech exclusion pixels. Rows correspond to exclusion pixels, columns correspond to latitude and longitude, respectively. tree : cKDTree cKDTree built from resource lat, lon coordinates dist_tresh : float Estimate the distance between resource points. Calculated as half of the diagonal between closest resource points, with an extra 5% margin Returns ------- ind : ndarray 1D arrays of index values from the NN. Entries correspond to input exclusion pixels. """ dist, ind = tree.query(excl_coords) ind[(dist >= dist_thresh)] = -1 return ind
[docs] def initialize_dataset(self, dset, chunks=(128, 128)): """ Initialize output dataset in exclusions h5 file. If dataset already exists, a warning will be issued. Parameters ---------- dset : str Name of the dataset in the exclusions H5 file to create. chunks : tuple, optional Chunk size for the dataset, by default (128, 128). """ with h5py.File(self._excl_fpath, "a") as f: if dset in list(f): wmsg = ( 'TechMap results dataset "{}" already exists ' 'in pre-existing Exclusions TechMapping file "{}"'.format( dset, self._excl_fpath ) ) logger.warning(wmsg) warn(wmsg, FileInputWarning) else: logger.info( f"Initializing tech map dataset {dset} in " f"{self._excl_fpath}" ) f.create_dataset( dset, shape=self._excl_shape, dtype=np.int32, chunks=chunks, fillvalue=-1 )
def _check_fout(self): """Check the TechMapping output file for cached data.""" with h5py.File(self._excl_fpath, 'r') as f: if LATITUDE not in f or LONGITUDE not in f: emsg = ('Datasets "latitude" and/or "longitude" not in ' 'pre-existing Exclusions TechMapping file "{}". ' 'Cannot proceed.' .format(os.path.basename(self._excl_fpath))) logger.exception(emsg) raise FileInputError(emsg)
[docs] def map_resource(self, dset, res_fpath, dist_margin=1.05, max_workers=None, batch_size=100): """ Map all resource gids to exclusion gids. Save results to dset in exclusions h5 file. Parameters ---------- dset : str, optional Name of the output dataset in the exclusions H5 file to which the tech map will be saved. res_fpath : str Filepath to .h5 resource file that we're mapping to. dist_margin : float, optional Extra margin to multiply times the computed distance between neighboring resource points, by default 1.05 max_workers : int, optional Number of cores to run mapping on. None uses all available cpus, by default None batch_size : int, optional Number of tasks to be submitted to parallel worker pool at one time, by default 1000. As a rule of thumb, this number should be set to ~10x the number of max_workers. Higher values are not necessarily better, and may slow down processing and/or result in out-of-memory errors. Values less than the number of workers can also lead to slower processing, due to poor load balancing. """ loggers = [__name__, "reV"] logger.info( f"Computing cKDtree for tech mapping using {res_fpath=!r} " f"and {dist_margin=!r}" ) tree, dist_thresh = self._build_tree( res_fpath, dist_margin=dist_margin ) with h5py.File(self._excl_fpath, "a") as f: f[dset].attrs["distance_threshold"] = dist_thresh f[dset].attrs["src_res_fpath"] = res_fpath n_jobs = len(self._gids) n_batches = ceil(n_jobs / batch_size) gid_batches = np.array_split(self._gids, n_batches) logger.info( f"Kicking off {n_jobs} resource mapping jobs in {n_batches} " "batches." ) n_finished = 0 with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: for gid_batch in gid_batches: futures = {} # iterate through split executions, submitting each to worker for i, gid in enumerate(gid_batch): # submit executions and append to futures list excl_coords = self._get_excl_coords( self._excl_fpath, gid, self._sc_row_indices, self._sc_col_indices, self._excl_row_slices, self._excl_col_slices, ) futures[ exe.submit( self.map_resource_gids, excl_coords, tree, dist_thresh, ) ] = i with h5py.File(self._excl_fpath, "a") as f: indices = f[dset] for future in as_completed(futures): i = futures[future] result = future.result() gid = gid_batch[i] row_slice, col_slice = self._get_excl_slices( gid, self._sc_row_indices, self._sc_col_indices, self._excl_row_slices, self._excl_col_slices, ) n_rows = row_slice.stop - row_slice.start n_cols = col_slice.stop - col_slice.start result_shape = (n_rows, n_cols) indices[row_slice, col_slice] = result.reshape( result_shape ) n_finished += 1 logger.info( "Parallel TechMapping batches completed: " f"{n_finished} out of {n_batches}" )
[docs] @classmethod def run( cls, excl_fpath, res_fpath, dset, sc_resolution=1200, dist_margin=1.05, max_workers=None, batch_size=1000, ): """Run parallel mapping and save to h5 file. Parameters ---------- excl_fpath : str Filepath to exclusions data HDF5 file. This file must must contain latitude and longitude datasets. res_fpath : str Filepath to HDF5 resource file (e.g. WTK or NSRDB) to which the exclusions will be mapped. Can refer to a single file (e.g., "/path/to/nsrdb_2024.h5" or a wild-card e.g., "/path/to/nsrdb_{}.h5") dset : str Dataset name in the `excl_fpath` file to which the the techmap (exclusions-to-resource mapping data) will be saved. .. Important:: If this dataset already exists in the h5 file, it will be overwritten. sc_resolution : int | None, optional Defines how many exclusion pixels are mapped at a time. Units indicate the length of one dimension, in pixels, of each square chunk to be mapped. By default, this value is 1200, which will map the exclusion pixels in 1200x1200 pixel chunks. .. Note:: This parameter does not affect the exclusion to resource (tech) mapping, which deviates from how the effect of the ``sc_resolution`` parameter works in other functionality within ``reV``. dist_margin : float, optional Extra margin to multiply times the computed distance between neighboring resource points, by default 1.05 max_workers : int, optional Number of cores to run mapping on. None uses all available cpus, by default None batch_size : int, optional Number of tasks to be submitted to parallel worker pool at one time, by default 1000. As a rule of thumb, this number should be set to ~10x the number of max_workers. Higher values are not necessarily better, and may slow down processing and/or result in out-of-memory errors. Values less than the number of workers can also lead to slower processing, due to poor load balancing. """ mapper = cls(excl_fpath, sc_resolution=sc_resolution) mapper.initialize_dataset(dset) mapper.map_resource( dset=dset, res_fpath=res_fpath, dist_margin=dist_margin, max_workers=max_workers, batch_size=batch_size, )