Source code for reV.supply_curve.sc_aggregation

# -*- coding: utf-8 -*-
# pylint: disable=anomalous-backslash-in-string
"""reV supply curve aggregation framework.

Created on Fri Jun 21 13:24:31 2019

@author: gbuster
"""
from concurrent.futures import as_completed
import logging
import numpy as np
import psutil
import os
import pandas as pd
from warnings import warn

from reV.generation.base import BaseGen
from reV.handlers.exclusions import ExclusionLayers
from reV.supply_curve.aggregation import (AbstractAggFileHandler,
                                          BaseAggregation, Aggregation)
from reV.supply_curve.exclusions import FrictionMask
from reV.supply_curve.extent import SupplyCurveExtent
from reV.supply_curve.points import GenerationSupplyCurvePoint
from reV.utilities.exceptions import (EmptySupplyCurvePointError,
                                      OutputWarning, FileInputError,
                                      InputWarning)
from reV.utilities import log_versions

from rex.resource import Resource
from rex.multi_file_resource import MultiFileResource
from rex.utilities.execution import SpawnProcessPool

logger = logging.getLogger(__name__)


[docs]class SupplyCurveAggFileHandler(AbstractAggFileHandler): """ Framework to handle aggregation summary context managers: - exclusions .h5 file - generation .h5 file - econ .h5 file (optional) - friction surface .h5 file (optional) - variable power density .csv (optional) """ def __init__(self, excl_fpath, gen_fpath, econ_fpath=None, data_layers=None, power_density=None, excl_dict=None, friction_fpath=None, friction_dset=None, area_filter_kernel='queen', min_area=None): """ Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). gen_fpath : str Filepath to .h5 reV generation output results. econ_fpath : str | None Filepath to .h5 reV econ output results. This is optional and only used if the lcoe_dset is not present in the gen_fpath file. data_layers : None | dict Aggregation data layers. Must be a dictionary keyed by data label name. Each value must be another dictionary with "dset", "method", and "fpath". power_density : float | str | None Power density in MW/km2 or filepath to variable power density file. None will attempt to infer a constant power density from the generation meta data technology. Variable power density csvs must have "gid" and "power_density" columns where gid is the resource gid (typically wtk or nsrdb gid) and the power_density column is in MW/km2. excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. friction_fpath : str | None Filepath to friction surface data (cost based exclusions). Must be paired with friction_dset. The friction data must be the same shape as the exclusions. Friction input creates a new output "mean_lcoe_friction" which is the nominal LCOE multiplied by the friction data. friction_dset : str | None Dataset name in friction_fpath for the friction surface data. Must be paired with friction_fpath. Must be same shape as exclusions. area_filter_kernel : str Contiguous area filter method to use on final exclusions mask min_area : float | None Minimum required contiguous area filter in sq-km """ super().__init__(excl_fpath, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area) self._gen = self._open_gen_econ_resource(gen_fpath, econ_fpath) # pre-initialize the resource meta data _ = self._gen.meta self._data_layers = data_layers self._power_density = power_density self._parse_power_density() self._friction_layer = None if friction_fpath is not None and friction_dset is not None: self._friction_layer = FrictionMask(friction_fpath, friction_dset) if not np.all(self._friction_layer.shape == self._excl.shape): e = ('Friction layer shape {} must match exclusions shape {}!' .format(self._friction_layer.shape, self._excl.shape)) logger.error(e) raise FileInputError(e) @staticmethod def _open_gen_econ_resource(gen_fpath, econ_fpath): """Open a rex resource file handler for the reV generation and (optionally) the reV econ output(s). Parameters ---------- gen_fpath : str Filepath to .h5 reV generation output results. econ_fpath : str | None Filepath to .h5 reV econ output results. This is optional and only used if the lcoe_dset is not present in the gen_fpath file. Returns ------- handler : Resource | MultiFileResource Open resource handler initialized with gen_fpath and (optionally) econ_fpath. """ handler = None is_gen_h5 = isinstance(gen_fpath, str) and gen_fpath.endswith('.h5') is_econ_h5 = isinstance(econ_fpath, str) and econ_fpath.endswith('.h5') if is_gen_h5 and not is_econ_h5: handler = Resource(gen_fpath) elif is_gen_h5 and is_econ_h5: handler = MultiFileResource([gen_fpath, econ_fpath], check_files=True) return handler def _parse_power_density(self): """Parse the power density input. If file, open file handler.""" if isinstance(self._power_density, str): self._pdf = self._power_density if self._pdf.endswith('.csv'): self._power_density = pd.read_csv(self._pdf) if ('gid' in self._power_density and 'power_density' in self._power_density): self._power_density = self._power_density.set_index('gid') else: msg = ('Variable power density file must include "gid" ' 'and "power_density" columns, but received: {}' .format(self._power_density.columns.values)) logger.error(msg) raise FileInputError(msg) else: msg = ('Variable power density file must be csv but received: ' '{}'.format(self._pdf)) logger.error(msg) raise FileInputError(msg)
[docs] def close(self): """Close all file handlers.""" self._excl.close() self._gen.close() if self._friction_layer is not None: self._friction_layer.close()
@property def gen(self): """Get the gen file handler object. Returns ------- _gen : Outputs reV gen outputs handler object. """ return self._gen @property def data_layers(self): """Get the data layers object. Returns ------- _data_layers : dict Data layers namespace. """ return self._data_layers @property def power_density(self): """Get the power density object. Returns ------- _power_density : float | None | pd.DataFrame Constant power density float, None, or opened dataframe with (resource) "gid" and "power_density columns". """ return self._power_density @property def friction_layer(self): """Get the friction layer (cost based exclusions). Returns ------- friction_layer : None | FrictionMask Friction layer with scalar friction values if valid friction inputs were entered. Otherwise, None to not apply friction layer. """ return self._friction_layer
[docs]class SupplyCurveAggregation(BaseAggregation): """SupplyCurveAggregation""" def __init__(self, excl_fpath, tm_dset, econ_fpath=None, excl_dict=None, area_filter_kernel='queen', min_area=None, resolution=64, excl_area=None, res_fpath=None, gids=None, pre_extract_inclusions=False, res_class_dset=None, res_class_bins=None, cf_dset='cf_mean-means', lcoe_dset='lcoe_fcr-means', h5_dsets=None, data_layers=None, power_density=None, friction_fpath=None, friction_dset=None, cap_cost_scale=None, recalc_lcoe=True): """reV supply curve points aggregation framework. ``reV`` supply curve aggregation combines a high-resolution (e.g. 90m) exclusion dataset with a (typically) lower resolution (e.g. 2km) generation dataset by mapping all data onto the high- resolution grid and aggregating it by a large factor (e.g. 64 or 128). The result is coarsely-gridded data that summarizes capacity and generation potential as well as associated economics under a particular land access scenario. This module can also summarize extra data layers during the aggregation process, allowing for complementary land characterization analysis. Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions data HDF5 file. The exclusions HDF5 file should contain the layers specified in `excl_dict` and `data_layers`. These layers may also be spread out across multiple HDF5 files, in which case this input should be a list or tuple of filepaths pointing to the files containing the layers. Note that each data layer must be uniquely defined (i.e.only appear once and in a single input file). tm_dset : str Dataset name in the `excl_fpath` file containing the techmap (exclusions-to-resource mapping data). This data layer links the supply curve GID's to the generation GID's that are used to evaluate performance metrics such as ``mean_cf``. .. Important:: This dataset uniquely couples the (typically high-resolution) exclusion layers to the (typically lower-resolution) resource data. Therefore, a separate techmap must be used for every unique combination of resource and exclusion coordinates. .. Note:: If executing ``reV`` from the command line, you can specify a name that is not in the exclusions HDF5 file, and ``reV`` will calculate the techmap for you. Note however that computing the techmap and writing it to the exclusion HDF5 file is a blocking operation, so you may only run a single ``reV`` aggregation step at a time this way. econ_fpath : str, optional Filepath to HDF5 file with ``reV`` econ output results containing an `lcoe_dset` dataset. If ``None``, `lcoe_dset` should be a dataset in the `gen_fpath` HDF5 file that aggregation is executed on. .. Note:: If executing ``reV`` from the command line, this input can be set to ``"PIPELINE"`` to parse the output from one of these preceding pipeline steps: ``multi-year``, ``collect``, or ``generation``. However, note that duplicate executions of any of these commands within the pipeline may invalidate this parsing, meaning the `econ_fpath` input will have to be specified manually. By default, ``None``. excl_dict : dict | None Dictionary of exclusion keyword arguments of the format ``{layer_dset_name: {kwarg: value}}``, where ``layer_dset_name`` is a dataset in the exclusion h5 file and the ``kwarg: value`` pair is a keyword argument to the :class:`reV.supply_curve.exclusions.LayerMask` class. For example:: excl_dict = { "typical_exclusion": { "exclude_values": 255, }, "another_exclusion": { "exclude_values": [2, 3], "weight": 0.5 }, "exclusion_with_nodata": { "exclude_range": [10, 100], "exclude_nodata": True, "nodata_value": -1 }, "partial_setback": { "use_as_weights": True }, "height_limit": { "exclude_range": [0, 200] }, "slope": { "include_range": [0, 20] }, "developable_land": { "force_include_values": 42 }, "more_developable_land": { "force_include_range": [5, 10] }, "viewsheds": { "exclude_values": 1, "extent": { "layer": "federal_parks", "include_range": [1, 5] } } ... } Note that all the keys given in this dictionary should be datasets of the `excl_fpath` file. If ``None`` or empty dictionary, no exclusions are applied. By default, ``None``. area_filter_kernel : {"queen", "rook"}, optional Contiguous area filter method to use on final exclusions mask. The filters are defined as:: # Queen: # Rook: [[1,1,1], [[0,1,0], [1,1,1], [1,1,1], [1,1,1]] [0,1,0]] These filters define how neighboring pixels are "connected". Once pixels in the final exclusion layer are connected, the area of each resulting cluster is computed and compared against the `min_area` input. Any cluster with an area less than `min_area` is excluded from the final mask. This argument has no effect if `min_area` is ``None``. By default, ``"queen"``. min_area : float, optional Minimum area (in km\ :sup:`2`) required to keep an isolated cluster of (included) land within the resulting exclusions mask. Any clusters of land with areas less than this value will be marked as exclusions. See the documentation for `area_filter_kernel` for an explanation of how the area of each land cluster is computed. If ``None``, no area filtering is performed. By default, ``None``. resolution : int, optional Supply Curve resolution. This value defines how many pixels are in a single side of a supply curve cell. For example, a value of ``64`` would generate a supply curve where the side of each supply curve cell is ``64x64`` exclusion pixels. By default, ``64``. excl_area : float, optional Area of a single exclusion mask pixel (in km\ :sup:`2`). If ``None``, this value will be inferred from the profile transform attribute in `excl_fpath`. By default, ``None``. res_fpath : str, optional Filepath to HDF5 resource file (e.g. WTK or NSRDB). This input is required if techmap dset is to be created or if the ``gen_fpath`` input to the ``summarize`` or ``run`` methods is ``None``. By default, ``None``. gids : list, optional List of supply curve point gids to get summary for. If you would like to obtain all available ``reV`` supply curve points to run, you can use the :class:`reV.supply_curve.extent.SupplyCurveExtent` class like so:: import pandas as pd from reV.supply_curve.extent import SupplyCurveExtent excl_fpath = "..." resolution = ... tm_dset = "..." with SupplyCurveExtent(excl_fpath, resolution) as sc: gids = sc.valid_sc_points(tm_dset).tolist() ... If ``None``, supply curve aggregation is computed for all gids in the supply curve extent. By default, ``None``. pre_extract_inclusions : bool, optional Optional flag to pre-extract/compute the inclusion mask from the `excl_dict` input. It is typically faster to compute the inclusion mask on the fly with parallel workers. By default, ``False``. res_class_dset : str, optional Name of dataset in the ``reV`` generation HDF5 output file containing resource data. If ``None``, no aggregated resource classification is performed (i.e. no ``mean_res`` output), and the `res_class_bins` is ignored. By default, ``None``. res_class_bins : list, optional Optional input to perform separate aggregations for various resource data ranges. If ``None``, only a single aggregation per supply curve point is performed. Otherwise, this input should be a list of floats or ints representing the resource bin boundaries. One aggregation per resource value range is computed, and only pixels within the given resource range are aggregated. By default, ``None``. cf_dset : str, optional Dataset name from the ``reV`` generation HDF5 output file containing a 1D dataset of mean capacity factor values. This dataset will be mapped onto the high-resolution grid and used to compute the mean capacity factor for non-excluded area. By default, ``"cf_mean-means"``. lcoe_dset : str, optional Dataset name from the ``reV`` generation HDF5 output file containing a 1D dataset of mean LCOE values. This dataset will be mapped onto the high-resolution grid and used to compute the mean LCOE for non-excluded area, but only if the LCOE is not re-computed during processing (see the `recalc_lcoe` input for more info). By default, ``"lcoe_fcr-means"``. h5_dsets : list, optional Optional list of additional datasets from the ``reV`` generation/econ HDF5 output file to aggregate. If ``None``, no extra datasets are aggregated. .. WARNING:: This input is meant for passing through 1D datasets. If you specify a 2D or higher-dimensional dataset, you may run into memory errors. If you wish to aggregate 2D datasets, see the rep-profiles module. By default, ``None``. data_layers : dict, optional Dictionary of aggregation data layers of the format:: data_layers = { "output_layer_name": { "dset": "layer_name", "method": "mean", "fpath": "/path/to/data.h5" }, "another_output_layer_name": { "dset": "input_layer_name", "method": "mode", # optional "fpath" key omitted }, ... } The ``"output_layer_name"`` is the column name under which the aggregated data will appear in the output CSV file. The ``"output_layer_name"`` does not have to match the ``dset`` input value. The latter should match the layer name in the HDF5 from which the data to aggregate should be pulled. The ``method`` should be one of ``{"mode", "mean", "min", "max", "sum", "category"}``, describing how the high-resolution data should be aggregated for each supply curve point. ``fpath`` is an optional key that can point to an HDF5 file containing the layer data. If left out, the data is assumed to exist in the file(s) specified by the `excl_fpath` input. If ``None``, no data layer aggregation is performed. By default, ``None`` power_density : float | str, optional Power density value (in MW/km\ :sup:`2`) or filepath to variable power density CSV file containing the following columns: - ``gid`` : resource gid (typically wtk or nsrdb gid) - ``power_density`` : power density value (in MW/km\ :sup:`2`) If ``None``, a constant power density is inferred from the generation meta data technology. By default, ``None``. friction_fpath : str, optional Filepath to friction surface data (cost based exclusions). Must be paired with the `friction_dset` input below. The friction data must be the same shape as the exclusions. Friction input creates a new output column ``"mean_lcoe_friction"`` which is the nominal LCOE multiplied by the friction data. If ``None``, no friction data is aggregated. By default, ``None``. friction_dset : str, optional Dataset name in friction_fpath for the friction surface data. Must be paired with the `friction_fpath` above. If ``None``, no friction data is aggregated. By default, ``None``. cap_cost_scale : str, optional Optional LCOE scaling equation to implement "economies of scale". Equations must be in python string format and must return a scalar value to multiply the capital cost by. Independent variables in the equation should match the names of the columns in the ``reV`` supply curve aggregation output table (see the documentation of :class:`~reV.supply_curve.sc_aggregation.SupplyCurveAggregation` for details on available outputs). If ``None``, no economies of scale are applied. By default, ``None``. recalc_lcoe : bool, optional Flag to re-calculate the LCOE from the multi-year mean capacity factor and annual energy production data. This requires several datasets to be aggregated in the h5_dsets input: - ``system_capacity`` - ``fixed_charge_rate`` - ``capital_cost`` - ``fixed_operating_cost`` - ``variable_operating_cost`` If any of these datasets are missing from the ``reV`` generation HDF5 output, or if `recalc_lcoe` is set to ``False``, the mean LCOE will be computed from the data stored under the `lcoe_dset` instead. By default, ``True``. Examples -------- Standard outputs: sc_gid : int Unique supply curve gid. This is the enumerated supply curve points, which can have overlapping geographic locations due to different resource bins at the same geographic SC point. res_gids : list Stringified list of resource gids (e.g. original WTK or NSRDB resource GIDs) corresponding to each SC point. gen_gids : list Stringified list of generation gids (e.g. GID in the reV generation output, which corresponds to the reV project points and not necessarily the resource GIDs). gid_counts : list Stringified list of the sum of inclusion scalar values corresponding to each `gen_gid` and `res_gid`, where 1 is included, 0 is excluded, and 0.7 is included with 70 percent of available land. Each entry in this list is associated with the corresponding entry in the `gen_gids` and `res_gids` lists. n_gids : int Total number of included pixels. This is a boolean sum and considers partial inclusions to be included (e.g. 1). mean_cf : float Mean capacity factor of each supply curve point (the arithmetic mean is weighted by the inclusion layer) (unitless). mean_lcoe : float Mean LCOE of each supply curve point (the arithmetic mean is weighted by the inclusion layer). Units match the reV econ output ($/MWh). By default, the LCOE is re-calculated using the multi-year mean capacity factor and annual energy production. This requires several datasets to be aggregated in the h5_dsets input: ``fixed_charge_rate``, ``capital_cost``, ``fixed_operating_cost``, ``annual_energy_production``, and ``variable_operating_cost``. This recalc behavior can be disabled by setting ``recalc_lcoe=False``. mean_res : float Mean resource, the resource dataset to average is provided by the user in `res_class_dset`. The arithmetic mean is weighted by the inclusion layer. capacity : float Total capacity of each supply curve point (MW). Units are contingent on the `power_density` input units of MW/km2. area_sq_km : float Total included area for each supply curve point in km2. This is based on the nominal area of each exclusion pixel which by default is calculated from the exclusion profile attributes. The NREL reV default is 0.0081 km2 pixels (90m x 90m). The area sum considers partial inclusions. latitude : float Supply curve point centroid latitude coordinate, in degrees (does not consider exclusions). longitude : float Supply curve point centroid longitude coordinate, in degrees (does not consider exclusions). country : str Country of the supply curve point based on the most common country of the associated resource meta data. Does not consider exclusions. state : str State of the supply curve point based on the most common state of the associated resource meta data. Does not consider exclusions. county : str County of the supply curve point based on the most common county of the associated resource meta data. Does not consider exclusions. elevation : float Mean elevation of the supply curve point based on the mean elevation of the associated resource meta data. Does not consider exclusions. timezone : int UTC offset of local timezone based on the most common timezone of the associated resource meta data. Does not consider exclusions. sc_point_gid : int Spatially deterministic supply curve point gid. Duplicate `sc_point_gid` values can exist due to resource binning. sc_row_ind : int Row index of the supply curve point in the aggregated exclusion grid. sc_col_ind : int Column index of the supply curve point in the aggregated exclusion grid res_class : int Resource class for the supply curve gid. Each geographic supply curve point (`sc_point_gid`) can have multiple resource classes associated with it, resulting in multiple supply curve gids (`sc_gid`) associated with the same spatially deterministic supply curve point. Optional outputs: mean_friction : float Mean of the friction data provided in 'friction_fpath' and 'friction_dset'. The arithmetic mean is weighted by boolean inclusions and considers partial inclusions to be included. mean_lcoe_friction : float Mean of the nominal LCOE multiplied by mean_friction value. mean_{dset} : float Mean input h5 dataset(s) provided by the user in 'h5_dsets'. These mean calculations are weighted by the partial inclusion layer. data_layers : float | int | str | dict Requested data layer aggregations, each data layer must be the same shape as the exclusion layers. - mode: int | str Most common value of a given data layer after applying the boolean inclusion mask. - mean : float Arithmetic mean value of a given data layer weighted by the scalar inclusion mask (considers partial inclusions). - min : float | int Minimum value of a given data layer after applying the boolean inclusion mask. - max : float | int Maximum value of a given data layer after applying the boolean inclusion mask. - sum : float Sum of a given data layer weighted by the scalar inclusion mask (considers partial inclusions). - category : dict Dictionary mapping the unique values in the `data_layer` to the sum of inclusion scalar values associated with all pixels with that unique value. """ log_versions(logger) logger.info('Initializing SupplyCurveAggregation...') logger.debug('Exclusion filepath: {}'.format(excl_fpath)) logger.debug('Exclusion dict: {}'.format(excl_dict)) super().__init__(excl_fpath, tm_dset, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area, resolution=resolution, excl_area=excl_area, res_fpath=res_fpath, gids=gids, pre_extract_inclusions=pre_extract_inclusions) self._econ_fpath = econ_fpath self._res_class_dset = res_class_dset self._res_class_bins = self._convert_bins(res_class_bins) self._cf_dset = cf_dset self._lcoe_dset = lcoe_dset self._h5_dsets = h5_dsets self._cap_cost_scale = cap_cost_scale self._power_density = power_density self._friction_fpath = friction_fpath self._friction_dset = friction_dset self._data_layers = data_layers self._recalc_lcoe = recalc_lcoe logger.debug('Resource class bins: {}'.format(self._res_class_bins)) if self._cap_cost_scale is not None: if self._h5_dsets is None: self._h5_dsets = [] self._h5_dsets += list(BaseGen.LCOE_ARGS) self._h5_dsets = list(set(self._h5_dsets)) if self._power_density is None: msg = ('Supply curve aggregation power density not specified. ' 'Will try to infer based on lookup table: {}' .format(GenerationSupplyCurvePoint.POWER_DENSITY)) logger.warning(msg) warn(msg, InputWarning) self._check_data_layers() def _check_data_layers(self, methods=('mean', 'max', 'min', 'mode', 'sum', 'category')): """Run pre-flight checks on requested aggregation data layers. Parameters ---------- methods : list | tuple Data layer aggregation methods that are available to the user. """ if self._data_layers is not None: logger.debug('Checking data layers...') with ExclusionLayers(self._excl_fpath) as f: shape_base = f.shape for k, v in self._data_layers.items(): if 'dset' not in v: raise KeyError('Data aggregation "dset" data layer "{}" ' 'must be specified.'.format(k)) if 'method' not in v: raise KeyError('Data aggregation "method" data layer "{}" ' 'must be specified.'.format(k)) elif v['method'].lower() not in methods: raise ValueError('Cannot recognize data layer agg method: ' '"{}". Can only do: {}.' .format(v['method'], methods)) if 'fpath' in v: with ExclusionLayers(v['fpath']) as f: try: mismatched_shapes = any(f.shape != shape_base) except TypeError: mismatched_shapes = f.shape != shape_base if mismatched_shapes: msg = ('Data shape of data layer "{}" is {}, ' 'which does not match the baseline ' 'exclusions shape {}.' .format(k, f.shape, shape_base)) raise FileInputError(msg) logger.debug('Finished checking data layers.') @staticmethod def _get_res_gen_lcoe_data(gen, res_class_dset, res_class_bins, cf_dset, lcoe_dset): """Extract the basic resource / generation / lcoe data to be used in the aggregation process. Parameters ---------- gen : Resource | MultiFileResource Open rex resource handler initialized from gen_fpath and (optionally) econ_fpath. res_class_dset : str | None Dataset in the generation file dictating resource classes. None if no resource classes. res_class_bins : list | None List of two-entry lists dictating the resource class bins. None if no resource classes. cf_dset : str Dataset name from f_gen containing capacity factor mean values. lcoe_dset : str Dataset name from f_gen containing LCOE mean values. Returns ------- res_data : np.ndarray | None Extracted resource data from res_class_dset res_class_bins : list List of resouce class bin ranges. cf_data : np.ndarray | None Capacity factor data extracted from cf_dset in gen lcoe_data : np.ndarray | None LCOE data extracted from lcoe_dset in gen """ dset_list = (res_class_dset, cf_dset, lcoe_dset) gen_dsets = [] if gen is None else gen.datasets labels = ('res_class_dset', 'cf_dset', 'lcoe_dset') temp = [None, None, None] if isinstance(gen, Resource): source_fps = [gen.h5_file] elif isinstance(gen, MultiFileResource): source_fps = gen._h5_files else: msg = ('Did not recognize gen object input of type "{}": {}' .format(type(gen), gen)) logger.error(msg) raise TypeError(msg) for i, dset in enumerate(dset_list): if dset in gen_dsets: _warn_about_large_datasets(gen, dset) temp[i] = gen[dset] elif dset not in gen_dsets and dset is not None: w = ('Could not find "{}" input as "{}" in source files: {}. ' 'Available datasets: {}' .format(labels[i], dset, source_fps, gen_dsets)) logger.warning(w) warn(w, OutputWarning) res_data, cf_data, lcoe_data = temp if res_class_dset is None or res_class_bins is None: res_class_bins = [None] return res_data, res_class_bins, cf_data, lcoe_data @staticmethod def _get_extra_dsets(gen, h5_dsets): """Extract extra ancillary datasets to be used in the aggregation process Parameters ---------- gen : Resource | MultiFileResource Open rex resource handler initialized from gen_fpath and (optionally) econ_fpath. h5_dsets : list | None Optional list of additional datasets from the source h5 gen/econ files to aggregate. Returns ------- h5_dsets_data : dict | None If additional h5_dsets are requested, this will be a dictionary keyed by the h5 dataset names. The corresponding values will be the extracted arrays from the h5 files. """ # look for the datasets required by the LCOE re-calculation and make # lists of the missing datasets gen_dsets = [] if gen is None else gen.datasets lcoe_recalc_req = ('fixed_charge_rate', 'capital_cost', 'fixed_operating_cost', 'variable_operating_cost', 'system_capacity') missing_lcoe_source = [k for k in lcoe_recalc_req if k not in gen_dsets] missing_lcoe_request = [] if isinstance(gen, Resource): source_fps = [gen.h5_file] elif isinstance(gen, MultiFileResource): source_fps = gen._h5_files else: msg = ('Did not recognize gen object input of type "{}": {}' .format(type(gen), gen)) logger.error(msg) raise TypeError(msg) h5_dsets_data = None if h5_dsets is not None: missing_lcoe_request = [k for k in lcoe_recalc_req if k not in h5_dsets] if not isinstance(h5_dsets, (list, tuple)): e = ('Additional h5_dsets argument must be a list or tuple ' 'but received: {} {}'.format(type(h5_dsets), h5_dsets)) logger.error(e) raise TypeError(e) missing_h5_dsets = [k for k in h5_dsets if k not in gen_dsets] if any(missing_h5_dsets): msg = ('Could not find requested h5_dsets "{}" in ' 'source files: {}. Available datasets: {}' .format(missing_h5_dsets, source_fps, gen_dsets)) logger.error(msg) raise FileInputError(msg) h5_dsets_data = {dset: gen[dset] for dset in h5_dsets} if any(missing_lcoe_source): msg = ('Could not find the datasets in the gen source file that ' 'are required to re-calculate the multi-year LCOE. If you ' 'are running a multi-year job, it is strongly suggested ' 'you pass through these datasets to re-calculate the LCOE ' 'from the multi-year mean CF: {}' .format(missing_lcoe_source)) logger.warning(msg) warn(msg, InputWarning) if any(missing_lcoe_request): msg = ('It is strongly advised that you include the following ' 'datasets in the h5_dsets request in order to re-calculate ' 'the LCOE from the multi-year mean CF and AEP: {}' .format(missing_lcoe_request)) logger.warning(msg) warn(msg, InputWarning) return h5_dsets_data
[docs] @classmethod def run_serial(cls, excl_fpath, gen_fpath, tm_dset, gen_index, econ_fpath=None, excl_dict=None, inclusion_mask=None, area_filter_kernel='queen', min_area=None, resolution=64, gids=None, args=None, res_class_dset=None, res_class_bins=None, cf_dset='cf_mean-means', lcoe_dset='lcoe_fcr-means', h5_dsets=None, data_layers=None, power_density=None, friction_fpath=None, friction_dset=None, excl_area=None, cap_cost_scale=None, recalc_lcoe=True): """Standalone method to create agg summary - can be parallelized. Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). gen_fpath : str Filepath to .h5 reV generation output results. tm_dset : str Dataset name in the exclusions file containing the exclusions-to-resource mapping data. gen_index : np.ndarray Array of generation gids with array index equal to resource gid. Array value is -1 if the resource index was not used in the generation run. econ_fpath : str | None Filepath to .h5 reV econ output results. This is optional and only used if the lcoe_dset is not present in the gen_fpath file. excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. inclusion_mask : np.ndarray | dict | optional 2D array pre-extracted inclusion mask where 1 is included and 0 is excluded. This must be either match the full exclusion shape or be a dict lookup of single-sc-point exclusion masks corresponding to the gids input and keyed by gids, by default None which will calculate exclusions on the fly for each sc point. area_filter_kernel : str Contiguous area filter method to use on final exclusions mask min_area : float | None Minimum required contiguous area filter in sq-km resolution : int | None SC resolution, must be input in combination with gid. Prefered option is to use the row/col slices to define the SC point instead. gids : list | None List of supply curve point gids to get summary for (can use to subset if running in parallel), or None for all gids in the SC extent, by default None args : list | None List of positional args for sc_point_method res_class_dset : str | None Dataset in the generation file dictating resource classes. None if no resource classes. res_class_bins : list | None List of two-entry lists dictating the resource class bins. None if no resource classes. cf_dset : str Dataset name from f_gen containing capacity factor mean values. lcoe_dset : str Dataset name from f_gen containing LCOE mean values. h5_dsets : list | None Optional list of additional datasets from the source h5 gen/econ files to aggregate. data_layers : None | dict Aggregation data layers. Must be a dictionary keyed by data label name. Each value must be another dictionary with "dset", "method", and "fpath". power_density : float | str | None Power density in MW/km2 or filepath to variable power density file. None will attempt to infer a constant power density from the generation meta data technology. Variable power density csvs must have "gid" and "power_density" columns where gid is the resource gid (typically wtk or nsrdb gid) and the power_density column is in MW/km2. friction_fpath : str | None Filepath to friction surface data (cost based exclusions). Must be paired with friction_dset. The friction data must be the same shape as the exclusions. Friction input creates a new output "mean_lcoe_friction" which is the nominal LCOE multiplied by the friction data. friction_dset : str | None Dataset name in friction_fpath for the friction surface data. Must be paired with friction_fpath. Must be same shape as exclusions. excl_area : float | None, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None cap_cost_scale : str | None Optional LCOE scaling equation to implement "economies of scale". Equations must be in python string format and return a scalar value to multiply the capital cost by. Independent variables in the equation should match the names of the columns in the reV supply curve aggregation table. recalc_lcoe : bool Flag to re-calculate the LCOE from the multi-year mean capacity factor and annual energy production data. This requires several datasets to be aggregated in the h5_dsets input: system_capacity, fixed_charge_rate, capital_cost, fixed_operating_cost, and variable_operating_cost. Returns ------- summary : list List of dictionaries, each being an SC point summary. """ summary = [] with SupplyCurveExtent(excl_fpath, resolution=resolution) as sc: points = sc.points exclusion_shape = sc.exclusions.shape if gids is None: gids = sc.valid_sc_points(tm_dset) elif np.issubdtype(type(gids), np.number): gids = [gids] slice_lookup = sc.get_slice_lookup(gids) logger.debug('Starting SupplyCurveAggregation serial with ' 'supply curve {} gids'.format(len(gids))) cls._check_inclusion_mask(inclusion_mask, gids, exclusion_shape) # pre-extract handlers so they are not repeatedly initialized file_kwargs = {'econ_fpath': econ_fpath, 'data_layers': data_layers, 'power_density': power_density, 'excl_dict': excl_dict, 'area_filter_kernel': area_filter_kernel, 'min_area': min_area, 'friction_fpath': friction_fpath, 'friction_dset': friction_dset} with SupplyCurveAggFileHandler(excl_fpath, gen_fpath, **file_kwargs) as fh: temp = cls._get_res_gen_lcoe_data(fh.gen, res_class_dset, res_class_bins, cf_dset, lcoe_dset) res_data, res_class_bins, cf_data, lcoe_data = temp h5_dsets_data = cls._get_extra_dsets(fh.gen, h5_dsets) n_finished = 0 for gid in gids: gid_inclusions = cls._get_gid_inclusion_mask( inclusion_mask, gid, slice_lookup, resolution=resolution) for ri, res_bin in enumerate(res_class_bins): try: pointsum = GenerationSupplyCurvePoint.summarize( gid, fh.exclusions, fh.gen, tm_dset, gen_index, res_class_dset=res_data, res_class_bin=res_bin, cf_dset=cf_data, lcoe_dset=lcoe_data, h5_dsets=h5_dsets_data, data_layers=fh.data_layers, resolution=resolution, exclusion_shape=exclusion_shape, power_density=fh.power_density, args=args, excl_dict=excl_dict, inclusion_mask=gid_inclusions, excl_area=excl_area, close=False, friction_layer=fh.friction_layer, cap_cost_scale=cap_cost_scale, recalc_lcoe=recalc_lcoe) except EmptySupplyCurvePointError: logger.debug('SC point {} is empty'.format(gid)) else: pointsum['sc_point_gid'] = gid pointsum['sc_row_ind'] = points.loc[gid, 'row_ind'] pointsum['sc_col_ind'] = points.loc[gid, 'col_ind'] pointsum['res_class'] = ri summary.append(pointsum) logger.debug('Serial aggregation completed gid {}: ' '{} out of {} points complete' .format(gid, n_finished, len(gids))) n_finished += 1 return summary
[docs] def run_parallel(self, gen_fpath, args=None, max_workers=None, sites_per_worker=100): """Get the supply curve points aggregation summary using futures. Parameters ---------- gen_fpath : str Filepath to .h5 reV generation output results. args : tuple | list | None List of summary arguments to include. None defaults to all available args defined in the class attr. max_workers : int | None, optional Number of cores to run summary on. None is all available cpus, by default None sites_per_worker : int Number of sc_points to summarize on each worker, by default 100 Returns ------- summary : list List of dictionaries, each being an SC point summary. """ gen_index = self._parse_gen_index(gen_fpath) chunks = int(np.ceil(len(self.gids) / sites_per_worker)) chunks = np.array_split(self.gids, chunks) logger.info('Running supply curve point aggregation for ' 'points {} through {} at a resolution of {} ' 'on {} cores in {} chunks.' .format(self.gids[0], self.gids[-1], self._resolution, max_workers, len(chunks))) slice_lookup = None if self._inclusion_mask is not None: with SupplyCurveExtent(self._excl_fpath, resolution=self._resolution) as sc: assert sc.exclusions.shape == self._inclusion_mask.shape slice_lookup = sc.get_slice_lookup(self.gids) futures = [] summary = [] n_finished = 0 loggers = [__name__, 'reV.supply_curve.point_summary', 'reV'] with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: # iterate through split executions, submitting each to worker for gid_set in chunks: # submit executions and append to futures list chunk_incl_masks = None if self._inclusion_mask is not None: chunk_incl_masks = {} for gid in gid_set: rs, cs = slice_lookup[gid] chunk_incl_masks[gid] = self._inclusion_mask[rs, cs] futures.append(exe.submit( self.run_serial, self._excl_fpath, gen_fpath, self._tm_dset, gen_index, econ_fpath=self._econ_fpath, excl_dict=self._excl_dict, inclusion_mask=chunk_incl_masks, res_class_dset=self._res_class_dset, res_class_bins=self._res_class_bins, cf_dset=self._cf_dset, lcoe_dset=self._lcoe_dset, h5_dsets=self._h5_dsets, data_layers=self._data_layers, resolution=self._resolution, power_density=self._power_density, friction_fpath=self._friction_fpath, friction_dset=self._friction_dset, area_filter_kernel=self._area_filter_kernel, min_area=self._min_area, gids=gid_set, args=args, excl_area=self._excl_area, cap_cost_scale=self._cap_cost_scale, recalc_lcoe=self._recalc_lcoe)) # gather results for future in as_completed(futures): n_finished += 1 summary += future.result() if n_finished % 10 == 0: mem = psutil.virtual_memory() logger.info('Parallel aggregation futures collected: ' '{} out of {}. Memory usage is {:.3f} GB out ' 'of {:.3f} GB ({:.2f}% utilized).' .format(n_finished, len(chunks), mem.used / 1e9, mem.total / 1e9, 100 * mem.used / mem.total)) return summary
@staticmethod def _convert_bins(bins): """Convert a list of floats or ints to a list of two-entry bin bounds. Parameters ---------- bins : list | None List of floats or ints (bin edges) to convert to list of two-entry bin boundaries or list of two-entry bind boundaries in final format Returns ------- bins : list List of two-entry bin boundaries """ if bins is None: return None type_check = [isinstance(x, (list, tuple)) for x in bins] if all(type_check): return bins elif any(type_check): raise TypeError('Resource class bins has inconsistent ' 'entry type: {}'.format(bins)) else: bbins = [] for i, b in enumerate(sorted(bins)): if i < len(bins) - 1: bbins.append([b, bins[i + 1]]) return bbins @staticmethod def _summary_to_df(summary): """Convert the agg summary list to a DataFrame. Parameters ---------- summary : list List of dictionaries, each being an SC point summary. Returns ------- summary : DataFrame Summary of the SC points. """ summary = pd.DataFrame(summary) sort_by = [x for x in ('sc_point_gid', 'res_class') if x in summary] summary = summary.sort_values(sort_by) summary = summary.reset_index(drop=True) summary.index.name = 'sc_gid' return summary
[docs] def summarize(self, gen_fpath, args=None, max_workers=None, sites_per_worker=100): """ Get the supply curve points aggregation summary Parameters ---------- gen_fpath : str Filepath to .h5 reV generation output results. args : tuple | list | None List of summary arguments to include. None defaults to all available args defined in the class attr. max_workers : int | None, optional Number of cores to run summary on. None is all available cpus, by default None sites_per_worker : int Number of sc_points to summarize on each worker, by default 100 Returns ------- summary : list List of dictionaries, each being an SC point summary. """ if max_workers is None: max_workers = os.cpu_count() if max_workers == 1: gen_index = self._parse_gen_index(gen_fpath) afk = self._area_filter_kernel summary = self.run_serial(self._excl_fpath, gen_fpath, self._tm_dset, gen_index, econ_fpath=self._econ_fpath, excl_dict=self._excl_dict, inclusion_mask=self._inclusion_mask, res_class_dset=self._res_class_dset, res_class_bins=self._res_class_bins, cf_dset=self._cf_dset, lcoe_dset=self._lcoe_dset, h5_dsets=self._h5_dsets, data_layers=self._data_layers, resolution=self._resolution, power_density=self._power_density, friction_fpath=self._friction_fpath, friction_dset=self._friction_dset, area_filter_kernel=afk, min_area=self._min_area, gids=self.gids, args=args, excl_area=self._excl_area, cap_cost_scale=self._cap_cost_scale, recalc_lcoe=self._recalc_lcoe) else: summary = self.run_parallel(gen_fpath=gen_fpath, args=args, max_workers=max_workers, sites_per_worker=sites_per_worker) if not any(summary): e = ('Supply curve aggregation found no non-excluded SC points. ' 'Please check your exclusions or subset SC GID selection.') logger.error(e) raise EmptySupplyCurvePointError(e) summary = self._summary_to_df(summary) return summary
[docs] def run(self, out_fpath, gen_fpath=None, args=None, max_workers=None, sites_per_worker=100): """Run a supply curve aggregation. Parameters ---------- gen_fpath : str, optional Filepath to HDF5 file with ``reV`` generation output results. If ``None``, a simple aggregation without any generation, resource, or cost data is performed. .. Note:: If executing ``reV`` from the command line, this input can be set to ``"PIPELINE"`` to parse the output from one of these preceding pipeline steps: ``multi-year``, ``collect``, or ``econ``. However, note that duplicate executions of any of these commands within the pipeline may invalidate this parsing, meaning the `econ_fpath` input will have to be specified manually. By default, ``None``. args : tuple | list, optional List of columns to include in summary output table. ``None`` defaults to all available args defined in the :class:`~reV.supply_curve.sc_aggregation.SupplyCurveAggregation` documentation. By default, ``None``. max_workers : int, optional Number of cores to run summary on. ``None`` is all available CPUs. By default, ``None``. sites_per_worker : int, optional Number of sc_points to summarize on each worker. By default, ``100``. Returns ------- str Path to output CSV file containing supply curve aggregation. """ if gen_fpath is None: out = Aggregation.run( self._excl_fpath, self._res_fpath, self._tm_dset, excl_dict=self._excl_dict, resolution=self._resolution, excl_area=self._excl_area, area_filter_kernel=self._area_filter_kernel, min_area=self._min_area, pre_extract_inclusions=self._pre_extract_inclusions, max_workers=max_workers, sites_per_worker=sites_per_worker) summary = out['meta'] else: summary = self.summarize(gen_fpath=gen_fpath, args=args, max_workers=max_workers, sites_per_worker=sites_per_worker) out_fpath = _format_sc_agg_out_fpath(out_fpath) summary.to_csv(out_fpath) return out_fpath
def _format_sc_agg_out_fpath(out_fpath): """Add CSV file ending and replace underscore, if necessary.""" if not out_fpath.endswith(".csv"): out_fpath = '{}.csv'.format(out_fpath) project_dir, out_fn = os.path.split(out_fpath) out_fn = out_fn.replace("supply_curve_aggregation", "supply-curve-aggregation") return os.path.join(project_dir, out_fn) def _warn_about_large_datasets(gen, dset): """Warn user about multi-dimensional datasets in passthrough datasets""" dset_shape = gen.shapes.get(dset, (1,)) if len(dset_shape) > 1: msg = ("Generation dataset {!r} is not 1-dimensional (shape: {})." "You may run into memory errors during aggregation - use " "rep-profiles for aggregating higher-order datasets instead!" .format(dset, dset_shape)) logger.warning(msg) warn(msg, UserWarning)