# -*- coding: utf-8 -*-
"""
reV supply curve points frameworks.
"""
import logging
from abc import ABC
from warnings import warn
import numpy as np
import pandas as pd
from rex.multi_time_resource import MultiTimeResource
from rex.resource import BaseResource, Resource
from rex.utilities.utilities import jsonify_dict
from reV.econ.economies_of_scale import EconomiesOfScale
from reV.econ.utilities import lcoe_fcr
from reV.handlers.exclusions import LATITUDE, LONGITUDE, ExclusionLayers
from reV.supply_curve.exclusions import ExclusionMask, ExclusionMaskFromDict
from reV.utilities import ResourceMetaField, SupplyCurveField
from reV.utilities.exceptions import (
DataShapeError,
EmptySupplyCurvePointError,
FileInputError,
InputWarning,
OutputWarning,
SupplyCurveInputError,
)
logger = logging.getLogger(__name__)
[docs]class AbstractSupplyCurvePoint(ABC):
"""
Abstract SC point based on only the point gid, SC shape, and resolution.
"""
def __init__(self, gid, exclusion_shape, resolution=64):
"""
Parameters
----------
gid : int
gid for supply curve point to analyze.
exclusion_shape : tuple
Shape of the full exclusions extent (rows, cols).
resolution : int
Number of exclusion points per SC point along an axis.
This number**2 is the total number of exclusion points per
SC point.
"""
self._gid = gid
self._resolution = resolution
self._rows = self._cols = self._sc_row_ind = self._sc_col_ind = None
self._parse_sc_row_col_ind(resolution, exclusion_shape)
self._parse_slices(resolution, exclusion_shape)
@staticmethod
def _ordered_unique(seq):
"""Get a list of unique values in the same order as the input sequence.
Parameters
----------
seq : list | tuple
Sequence of values.
Returns
-------
seq : list
List of unique values in seq input with original order.
"""
seen = set()
return [x for x in seq if not (x in seen or seen.add(x))]
def _parse_sc_row_col_ind(self, resolution, exclusion_shape):
"""Parse SC row and column index.
Parameters
----------
resolution : int | None
SC resolution, must be input in combination with gid.
exclusion_shape : tuple
Shape of the exclusions extent (rows, cols).
"""
n_sc_cols = int(np.ceil(exclusion_shape[1] / resolution))
self._sc_row_ind = self._gid // n_sc_cols
self._sc_col_ind = self._gid % n_sc_cols
def _parse_slices(self, resolution, exclusion_shape):
"""Parse row and column resource/generation grid slices.
Parameters
----------
resolution : int | None
SC resolution, must be input in combination with gid.
exclusion_shape : tuple
Shape of the exclusions extent (rows, cols).
"""
inds = self.get_agg_slices(self._gid, exclusion_shape, resolution)
self._rows, self._cols = inds
@property
def gid(self):
"""Supply curve point gid"""
return self._gid
@property
def sc_point_gid(self):
"""
Supply curve point gid
Returns
-------
int
"""
return self._gid
@property
def sc_row_ind(self):
"""int: Supply curve row index"""
return self._sc_row_ind
@property
def sc_col_ind(self):
"""int: Supply curve column index"""
return self._sc_col_ind
@property
def resolution(self):
"""Get the supply curve grid aggregation resolution"""
return self._resolution
@property
def rows(self):
"""Get the rows of the exclusions layer associated with this SC point.
Returns
-------
rows : slice
Row slice to index the high-res layer (exclusions layer) for the
gid in the agg layer (supply curve layer).
"""
return self._rows
@property
def cols(self):
"""Get the cols of the exclusions layer associated with this SC point.
Returns
-------
cols : slice
Column slice to index the high-res layer (exclusions layer) for the
gid in the agg layer (supply curve layer).
"""
return self._cols
[docs] @staticmethod
def get_agg_slices(gid, shape, resolution):
"""Get the row, col slices of an aggregation gid.
Parameters
----------
gid : int
Gid of interest in the aggregated layer.
shape : tuple
(row, col) shape tuple of the underlying high-res layer.
resolution : int
Resolution of the aggregation: number of pixels in 1D being
aggregated.
Returns
-------
row_slice : slice
Row slice to index the high-res layer for the gid in the agg layer.
col_slice : slice
Col slice to index the high-res layer for the gid in the agg layer.
"""
nrows = int(np.ceil(shape[0] / resolution))
ncols = int(np.ceil(shape[1] / resolution))
super_shape = (nrows, ncols)
arr = np.arange(nrows * ncols).reshape(super_shape)
try:
loc = np.where(arr == gid)
row = loc[0][0]
col = loc[1][0]
except IndexError as exc:
msg = (
"Gid {} out of bounds for extent shape {} and "
"resolution {}.".format(gid, shape, resolution)
)
raise IndexError(msg) from exc
if row + 1 != nrows:
row_slice = slice(row * resolution, (row + 1) * resolution)
else:
row_slice = slice(row * resolution, shape[0])
if col + 1 != ncols:
col_slice = slice(col * resolution, (col + 1) * resolution)
else:
col_slice = slice(col * resolution, shape[1])
return row_slice, col_slice
[docs]class SupplyCurvePoint(AbstractSupplyCurvePoint):
"""Generic single SC point based on exclusions, resolution, and techmap"""
def __init__(
self,
gid,
excl,
tm_dset,
excl_dict=None,
inclusion_mask=None,
resolution=64,
excl_area=None,
exclusion_shape=None,
close=True,
):
"""
Parameters
----------
gid : int
gid for supply curve point to analyze.
excl : str | list | tuple | ExclusionMask
Filepath(s) to exclusions h5 or ExclusionMask file handler.
tm_dset : str
Dataset name in the exclusions file containing the
exclusions-to-resource mapping data.
excl_dict : dict | None
Dictionary of exclusion keyword arugments of the format
{layer_dset_name: {kwarg: value}} where layer_dset_name is a
dataset in the exclusion h5 file and kwarg is a keyword argument to
the reV.supply_curve.exclusions.LayerMask class.
None if excl input is pre-initialized.
inclusion_mask : np.ndarray
2D array pre-extracted inclusion mask where 1 is included and 0 is
excluded. The shape of this will be checked against the input
resolution.
resolution : int
Number of exclusion points per SC point along an axis.
This number**2 is the total number of exclusion points per
SC point.
excl_area : float | None, optional
Area of an exclusion pixel in km2. None will try to infer the area
from the profile transform attribute in excl_fpath, by default None
exclusion_shape : tuple
Shape of the full exclusions extent (rows, cols). Inputing this
will speed things up considerably.
close : bool
Flag to close object file handlers on exit.
"""
self._excl_dict = excl_dict
self._close = close
self._excl_fpath, self._excls = self._parse_excl_file(excl)
if exclusion_shape is None:
exclusion_shape = self.exclusions.shape
super().__init__(gid, exclusion_shape, resolution=resolution)
self._gids = self._parse_techmap(tm_dset)
self._h5_gids = self._gids
self._h5_gid_set = None
self._incl_mask = inclusion_mask
self._incl_mask_flat = None
if inclusion_mask is not None:
msg = (
"Bad inclusion mask input shape of {} with stated "
"resolution of {}".format(inclusion_mask.shape, resolution)
)
assert len(inclusion_mask.shape) == 2, msg
assert inclusion_mask.shape[0] <= resolution, msg
assert inclusion_mask.shape[1] <= resolution, msg
assert inclusion_mask.size == len(self._gids), msg
self._incl_mask = inclusion_mask.copy()
self._centroid = None
self._excl_area = excl_area
self._check_excl()
@staticmethod
def _parse_excl_file(excl):
"""Parse excl filepath input or handler object and set to attrs.
Parameters
----------
excl : str | ExclusionMask
Filepath to exclusions geotiff or ExclusionMask handler
Returns
-------
excl_fpath : str | list | tuple
Filepath(s) for exclusions file
exclusions : ExclusionMask | None
Exclusions mask if input is already an open handler or None if it
is to be lazy instantiated.
"""
if isinstance(excl, (str, list, tuple)):
excl_fpath = excl
exclusions = None
elif isinstance(excl, ExclusionMask):
excl_fpath = excl.excl_h5.h5_file
exclusions = excl
else:
raise SupplyCurveInputError(
"SupplyCurvePoints needs an "
"exclusions file path, or "
"ExclusionMask handler but "
"received: {}".format(type(excl))
)
return excl_fpath, exclusions
def _parse_techmap(self, tm_dset):
"""Parse data from the tech map file (exclusions to resource mapping).
Raise EmptySupplyCurvePointError if there are no valid resource points
in this SC point.
Parameters
----------
tm_dset : str
Dataset name in the exclusions file containing the
exclusions-to-resource mapping data.
Returns
-------
res_gids : np.ndarray
1D array with length == number of exclusion points. reV resource
gids (native resource index) from the original resource data
corresponding to the tech exclusions.
"""
res_gids = self.exclusions.excl_h5[tm_dset, self.rows, self.cols]
res_gids = res_gids.astype(np.int32).flatten()
if (res_gids != -1).sum() == 0:
emsg = (
"Supply curve point gid {} has no viable exclusion points "
'based on exclusions file: "{}"'.format(
self._gid, self._excl_fpath
)
)
raise EmptySupplyCurvePointError(emsg)
return res_gids
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
if type is not None:
raise
[docs] def close(self):
"""Close all file handlers."""
if self._close:
if self._excls is not None:
self._excls.close()
@property
def exclusions(self):
"""Get the exclusions object.
Returns
-------
_excls : ExclusionMask
ExclusionMask h5 handler object.
"""
if self._excls is None:
self._excls = ExclusionMaskFromDict(
self._excl_fpath, layers_dict=self._excl_dict
)
return self._excls
@property
def centroid(self):
"""Get the supply curve point centroid coordinate.
Returns
-------
centroid : tuple
SC point centroid (lat, lon).
"""
if self._centroid is None:
lats = self.exclusions.excl_h5[LATITUDE, self.rows, self.cols]
lons = self.exclusions.excl_h5[LONGITUDE, self.rows, self.cols]
self._centroid = (lats.mean(), lons.mean())
return self._centroid
@property
def pixel_area(self):
"""The area in km2 of a single exclusion pixel. If this value was not
provided on initialization, it is determined from the profile of the
exclusion file.
Returns
-------
float
"""
if self._excl_area is None:
with ExclusionLayers(self._excl_fpath) as f:
self._excl_area = f.pixel_area
return self._excl_area
@property
def area(self):
"""Get the non-excluded resource area of the supply curve point in the
current resource class.
Returns
-------
area : float
Non-excluded resource/generation area in square km.
"""
mask = self._gids != -1
area = np.sum(self.include_mask_flat[mask]) * self.pixel_area
return area
@property
def latitude(self):
"""Get the SC point latitude"""
return self.centroid[0]
@property
def longitude(self):
"""Get the SC point longitude"""
return self.centroid[1]
@property
def n_gids(self):
"""
Get the total number of not fully excluded pixels associated with the
available resource/generation gids at the given sc gid.
Returns
-------
n_gids : list
"""
mask = self._gids != -1
n_gids = np.sum(self.include_mask_flat[mask] > 0)
return n_gids
@property
def include_mask(self):
"""Get the 2D inclusion mask (normalized with expected range: [0, 1]
where 1 is included and 0 is excluded).
Returns
-------
np.ndarray
"""
if self._incl_mask is None:
self._incl_mask = self.exclusions[self.rows, self.cols]
# make sure exclusion pixels outside resource extent are excluded
out_of_extent = self._gids.reshape(self._incl_mask.shape) == -1
self._incl_mask[out_of_extent] = 0.0
if self._incl_mask.max() > 1:
w = (
"Exclusions data max value is > 1: {}".format(
self._incl_mask.max()
),
InputWarning,
)
logger.warning(w)
warn(w)
return self._incl_mask
@property
def include_mask_flat(self):
"""Get the flattened inclusion mask (normalized with expected
range: [0, 1] where 1 is included and 0 is excluded).
Returns
-------
np.ndarray
"""
if self._incl_mask_flat is None:
self._incl_mask_flat = self.include_mask.flatten()
return self._incl_mask_flat
@property
def bool_mask(self):
"""Get a boolean inclusion mask (True if excl point is not excluded).
Returns
-------
mask : np.ndarray
Mask with length equal to the flattened exclusion shape
"""
return self._gids != -1
@property
def h5(self):
"""
placeholder for h5 Resource handler object
"""
@property
def h5_gid_set(self):
"""Get list of unique h5 gids corresponding to this sc point.
Returns
-------
h5_gids : list
List of h5 gids.
"""
if self._h5_gid_set is None:
self._h5_gid_set = self._ordered_unique(self._h5_gids)
if -1 in self._h5_gid_set:
self._h5_gid_set.remove(-1)
return self._h5_gid_set
@property
def summary(self):
"""
Placeholder for Supply curve point's meta data summary
"""
def _check_excl(self):
"""
Check to see if supply curve point is fully excluded
"""
if all(self.include_mask_flat[self.bool_mask] == 0):
msg = "Supply curve point gid {} is completely excluded!".format(
self._gid
)
raise EmptySupplyCurvePointError(msg)
[docs] def exclusion_weighted_mean(self, arr, drop_nan=True):
"""
Calc the exclusions-weighted mean value of an array of resource data.
Parameters
----------
arr : np.ndarray
Array of resource data.
drop_nan : bool
Flag to drop nan values from the mean calculation (only works for
1D arr input, profiles should not have NaN's)
Returns
-------
mean : float | np.ndarray
Mean of arr masked by the binary exclusions then weighted by
the non-zero exclusions. This will be a 1D numpy array if the
input data is a 2D numpy array (averaged along axis=1)
"""
if len(arr.shape) == 2:
x = arr[:, self._gids[self.bool_mask]].astype("float32")
incl = self.include_mask_flat[self.bool_mask]
x *= incl
mean = x.sum(axis=1) / incl.sum()
else:
x = arr[self._gids[self.bool_mask]].astype("float32")
incl = self.include_mask_flat[self.bool_mask]
if np.isnan(x).all():
return np.nan
if drop_nan and np.isnan(x).any():
nan_mask = np.isnan(x)
x = x[~nan_mask]
incl = incl[~nan_mask]
x *= incl
mean = x.sum() / incl.sum()
return mean
[docs] def mean_wind_dirs(self, arr):
"""
Calc the mean wind directions at every time-step
Parameters
----------
arr : np.ndarray
Array of wind direction data.
Returns
-------
mean_wind_dirs : np.ndarray | float
Mean wind direction of arr masked by the binary exclusions
"""
incl = self.include_mask_flat[self.bool_mask]
gids = self._gids[self.bool_mask]
if len(arr.shape) == 2:
arr_slice = (slice(None), gids)
ax = 1
else:
arr_slice = gids
ax = 0
angle = np.radians(arr[arr_slice], dtype=np.float32)
sin = np.mean(np.sin(angle) * incl, axis=ax)
cos = np.mean(np.cos(angle) * incl, axis=ax)
mean_wind_dirs = np.degrees(np.arctan2(sin, cos))
mask = mean_wind_dirs < 0
mean_wind_dirs[mask] += 360
return mean_wind_dirs
[docs] def aggregate(self, arr):
"""
Calc sum (aggregation) of the resource data.
Parameters
----------
arr : np.ndarray
Array of resource data.
Returns
-------
agg : float
Sum of arr masked by the binary exclusions
"""
if len(arr.shape) == 2:
x = arr[:, self._gids[self.bool_mask]].astype("float32")
ax = 1
else:
x = arr[self._gids[self.bool_mask]].astype("float32")
ax = 0
x *= self.include_mask_flat[self.bool_mask]
agg = x.sum(axis=ax)
return agg
[docs] @classmethod
def sc_mean(
cls,
gid,
excl,
tm_dset,
data,
excl_dict=None,
resolution=64,
exclusion_shape=None,
close=True,
):
"""
Compute exclusions weight mean for the sc point from data
Parameters
----------
gid : int
gid for supply curve point to analyze.
excl : str | ExclusionMask
Filepath to exclusions h5 or ExclusionMask file handler.
tm_dset : str
Dataset name in the exclusions file containing the
exclusions-to-resource mapping data.
data : ndarray | ResourceDataset
Array of data or open dataset handler to apply exclusions too
excl_dict : dict | None
Dictionary of exclusion keyword arugments of the format
{layer_dset_name: {kwarg: value}} where layer_dset_name is a
dataset in the exclusion h5 file and kwarg is a keyword argument to
the reV.supply_curve.exclusions.LayerMask class.
None if excl input is pre-initialized.
resolution : int
Number of exclusion points per SC point along an axis.
This number**2 is the total number of exclusion points per
SC point.
exclusion_shape : tuple
Shape of the full exclusions extent (rows, cols). Inputing this
will speed things up considerably.
close : bool
Flag to close object file handlers on exit
Returns
-------
ndarray
Exclusions weighted means of data for supply curve point
"""
kwargs = {
"excl_dict": excl_dict,
"resolution": resolution,
"exclusion_shape": exclusion_shape,
"close": close,
}
with cls(gid, excl, tm_dset, **kwargs) as point:
means = point.exclusion_weighted_mean(data)
return means
[docs] @classmethod
def sc_sum(
cls,
gid,
excl,
tm_dset,
data,
excl_dict=None,
resolution=64,
exclusion_shape=None,
close=True,
):
"""
Compute the aggregate (sum) of data for the sc point
Parameters
----------
gid : int
gid for supply curve point to analyze.
excl : str | ExclusionMask
Filepath to exclusions h5 or ExclusionMask file handler.
tm_dset : str
Dataset name in the exclusions file containing the
exclusions-to-resource mapping data.
data : ndarray | ResourceDataset
Array of data or open dataset handler to apply exclusions too
excl_dict : dict | None
Dictionary of exclusion keyword arugments of the format
{layer_dset_name: {kwarg: value}} where layer_dset_name is a
dataset in the exclusion h5 file and kwarg is a keyword argument to
the reV.supply_curve.exclusions.LayerMask class.
None if excl input is pre-initialized.
resolution : int
Number of exclusion points per SC point along an axis.
This number**2 is the total number of exclusion points per
SC point.
exclusion_shape : tuple
Shape of the full exclusions extent (rows, cols). Inputing this
will speed things up considerably.
close : bool
Flag to close object file handlers on exit.
Returns
-------
ndarray
Sum / aggregation of data for supply curve point
"""
kwargs = {
"excl_dict": excl_dict,
"resolution": resolution,
"exclusion_shape": exclusion_shape,
"close": close,
}
with cls(gid, excl, tm_dset, **kwargs) as point:
agg = point.aggregate(data)
return agg
@staticmethod
def _mode(data):
"""
Compute the mode of the data vector and return a single value
Parameters
----------
data : ndarray
data layer vector to compute mode for
Returns
-------
float | int
Mode of data
"""
if not data.size:
return None
# pd series is more flexible with non-numeric than stats mode
return pd.Series(data).mode().values[0]
@staticmethod
def _categorize(data, incl_mult):
"""
Extract the sum of inclusion scalar values (where 1 is
included, 0 is excluded, and 0.7 is included with 70 percent of
available land) for each unique (categorical value) in data
Parameters
----------
data : ndarray
Vector of categorical values
incl_mult : ndarray
Vector of inclusion values
Returns
-------
str
Jsonified string of the dictionary mapping categorical values to
total inclusions
"""
data = {
category: float(incl_mult[(data == category)].sum())
for category in np.unique(data)
}
data = jsonify_dict(data)
return data
@classmethod
def _agg_data_layer_method(cls, data, incl_mult, method):
"""Aggregate the data array using specified method.
Parameters
----------
data : np.ndarray | None
Data array that will be flattened and operated on using method.
This must be the included data. Exclusions should be applied
before this method.
incl_mult : np.ndarray | None
Scalar exclusion data for methods with exclusion-weighted
aggregation methods. Shape must match input data.
method : str
Aggregation method (mode, mean, max, min, sum, category)
Returns
-------
data : float | int | str | None
Result of applying method to data.
"""
method_func = {
"mode": cls._mode,
"mean": np.mean,
"max": np.max,
"min": np.min,
"sum": np.sum,
"category": cls._categorize,
}
if data is not None:
method = method.lower()
if method not in method_func:
e = (
"Cannot recognize data layer agg method: "
'"{}". Can only {}'.format(method, list(method_func))
)
logger.error(e)
raise ValueError(e)
if len(data.shape) > 1:
data = data.flatten()
if data.shape != incl_mult.shape:
e = (
"Cannot aggregate data with shape that doesnt "
"match excl mult!"
)
logger.error(e)
raise DataShapeError(e)
if method == "category":
data = method_func["category"](data, incl_mult)
elif method in ["mean", "sum"]:
data = data * incl_mult
data = method_func[method](data)
else:
data = method_func[method](data)
return data
[docs] def agg_data_layers(self, summary, data_layers):
"""Perform additional data layer aggregation. If there is no valid data
in the included area, the data layer will be taken from the full SC
point extent (ignoring exclusions). If there is still no valid data,
a warning will be raised and the data layer will have a NaN/None value.
Parameters
----------
summary : dict
Dictionary of summary outputs for this sc point.
data_layers : None | dict
Aggregation data layers. Must be a dictionary keyed by data label
name. Each value must be another dictionary with "dset", "method",
and "fpath".
Returns
-------
summary : dict
Dictionary of summary outputs for this sc point. A new entry for
each data layer is added.
"""
if data_layers is not None:
for name, attrs in data_layers.items():
excl_fp = attrs.get("fpath", self._excl_fpath)
if excl_fp != self._excl_fpath:
fh = ExclusionLayers(attrs["fpath"])
else:
fh = self.exclusions.excl_h5
raw = fh[attrs["dset"], self.rows, self.cols]
nodata = fh.get_nodata_value(attrs["dset"])
data = raw.flatten()[self.bool_mask]
incl_mult = self.include_mask_flat[self.bool_mask].copy()
if nodata is not None:
valid_data_mask = data != nodata
data = data[valid_data_mask]
incl_mult = incl_mult[valid_data_mask]
if not data.size:
m = (
'Data layer "{}" has no valid data for '
"SC point gid {} because of exclusions "
"and/or nodata values in the data layer.".format(
name, self._gid
)
)
logger.debug(m)
data = self._agg_data_layer_method(
data, incl_mult, attrs["method"]
)
summary[name] = data
if excl_fp != self._excl_fpath:
fh.close()
return summary
[docs]class AggregationSupplyCurvePoint(SupplyCurvePoint):
"""Generic single SC point to aggregate data from an h5 file."""
def __init__(
self,
gid,
excl,
agg_h5,
tm_dset,
excl_dict=None,
inclusion_mask=None,
resolution=64,
excl_area=None,
exclusion_shape=None,
close=True,
gen_index=None,
apply_exclusions=True,
):
"""
Parameters
----------
gid : int
gid for supply curve point to analyze.
excl : str | ExclusionMask
Filepath to exclusions h5 or ExclusionMask file handler.
agg_h5 : str | Resource
Filepath to .h5 file to aggregate or Resource handler
tm_dset : str
Dataset name in the exclusions file containing the
exclusions-to-resource mapping data.
excl_dict : dict | None
Dictionary of exclusion keyword arugments of the format
{layer_dset_name: {kwarg: value}} where layer_dset_name is a
dataset in the exclusion h5 file and kwarg is a keyword argument to
the reV.supply_curve.exclusions.LayerMask class.
None if excl input is pre-initialized.
inclusion_mask : np.ndarray
2D array pre-extracted inclusion mask where 1 is included and 0 is
excluded. The shape of this will be checked against the input
resolution.
resolution : int
Number of exclusion points per SC point along an axis.
This number**2 is the total number of exclusion points per
SC point.
excl_area : float | None, optional
Area of an exclusion pixel in km2. None will try to infer the area
from the profile transform attribute in excl_fpath, by default None
exclusion_shape : tuple
Shape of the full exclusions extent (rows, cols). Inputing this
will speed things up considerably.
close : bool
Flag to close object file handlers on exit.
gen_index : np.ndarray
Array of generation gids with array index equal to resource gid.
Array value is -1 if the resource index was not used in the
generation run.
apply_exclusions : bool
Flag to apply exclusions to the resource / generation gid's on
initialization.
"""
super().__init__(
gid,
excl,
tm_dset,
excl_dict=excl_dict,
inclusion_mask=inclusion_mask,
resolution=resolution,
excl_area=excl_area,
exclusion_shape=exclusion_shape,
close=close,
)
self._h5_fpath, self._h5 = self._parse_h5_file(agg_h5)
if gen_index is not None:
self._gids, _ = self._map_gen_gids(self._gids, gen_index)
self._h5_gids = self._gids
if (self._h5_gids != -1).sum() == 0:
emsg = (
"Supply curve point gid {} has no viable exclusion "
'points based on exclusions file: "{}"'.format(
self._gid, self._excl_fpath
)
)
raise EmptySupplyCurvePointError(emsg)
if apply_exclusions:
self._apply_exclusions()
@staticmethod
def _parse_h5_file(h5):
"""
Parse .h5 filepath input or handler object and set to attrs.
Parameters
----------
h5 : str | Resource
Filepath to .h5 file to aggregate or Resource handler
Returns
-------
h5_fpath : str
Filepath for .h5 file to aggregate
h5 : Resource | None
Resource if input is already an open handler or None if it
is to be lazy instantiated.
"""
if isinstance(h5, str):
h5_fpath = h5
h5 = None
elif issubclass(h5.__class__, BaseResource):
h5_fpath = h5.h5_file
elif issubclass(h5.__class__, MultiTimeResource):
h5_fpath = h5.h5_files
else:
raise SupplyCurveInputError(
"SupplyCurvePoints needs a "
".h5 file path, or "
"Resource handler but "
"received: {}".format(type(h5))
)
return h5_fpath, h5
def _apply_exclusions(self):
"""Apply exclusions by masking the generation and resource gid arrays.
This removes all res/gen entries that are masked by the exclusions or
resource bin."""
# exclusions mask is False where excluded
exclude = self.include_mask_flat == 0
self._gids[exclude] = -1
self._h5_gids[exclude] = -1
if (self._gids != -1).sum() == 0:
msg = "Supply curve point gid {} is completely excluded!".format(
self._gid
)
raise EmptySupplyCurvePointError(msg)
[docs] def close(self):
"""Close all file handlers."""
if self._close:
if self._excls is not None:
self._excls.close()
if self._h5 is not None:
self._h5.close()
@staticmethod
def _map_gen_gids(res_gids, gen_index):
"""
Map resource gids from techmap to gen gids in .h5 source file
Parameters
----------
res_gids : ndarray
resource gids from techmap
gen_index : ndarray
Equivalent gen gids to resource gids
Returns
-------
gen_gids : ndarray
gen gid to excl mapping
res_gids : ndarray
updated resource gid to excl mapping
"""
mask = (res_gids >= len(gen_index)) | (res_gids == -1)
res_gids[mask] = -1
gen_gids = gen_index[res_gids]
gen_gids[mask] = -1
res_gids[(gen_gids == -1)] = -1
return gen_gids, res_gids
@property
def h5(self):
"""
h5 Resource handler object
Returns
-------
_h5 : Resource
Resource h5 handler object.
"""
if self._h5 is None and "*" in self._h5_fpath:
self._h5 = MultiTimeResource(self._h5_fpath)
elif self._h5 is None:
self._h5 = Resource(self._h5_fpath)
return self._h5
@property
def country(self):
"""Get the SC point country based on the resource meta data."""
country = None
county_not_none = self.county is not None
if ResourceMetaField.COUNTRY in self.h5.meta and county_not_none:
# make sure country and county are coincident
counties = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.COUNTY
].values
iloc = np.where(counties == self.county)[0][0]
country = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.COUNTRY
].values
country = country[iloc]
elif ResourceMetaField.COUNTRY in self.h5.meta:
country = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.COUNTRY
].mode()
country = country.values[0]
return country
@property
def state(self):
"""Get the SC point state based on the resource meta data."""
state = None
if ResourceMetaField.STATE in self.h5.meta and self.county is not None:
# make sure state and county are coincident
counties = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.COUNTY
].values
iloc = np.where(counties == self.county)[0][0]
state = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.STATE
].values
state = state[iloc]
elif ResourceMetaField.STATE in self.h5.meta:
state = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.STATE
].mode()
state = state.values[0]
return state
@property
def county(self):
"""Get the SC point county based on the resource meta data."""
county = None
if ResourceMetaField.COUNTY in self.h5.meta:
county = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.COUNTY
].mode()
county = county.values[0]
return county
@property
def elevation(self):
"""Get the SC point elevation based on the resource meta data."""
elevation = None
if ResourceMetaField.ELEVATION in self.h5.meta:
elevation = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.ELEVATION
].mean()
return elevation
@property
def timezone(self):
"""Get the SC point timezone based on the resource meta data."""
timezone = None
county_not_none = self.county is not None
if ResourceMetaField.TIMEZONE in self.h5.meta and county_not_none:
# make sure timezone flag and county are coincident
counties = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.COUNTY
].values
iloc = np.where(counties == self.county)[0][0]
timezone = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.TIMEZONE
].values
timezone = timezone[iloc]
elif ResourceMetaField.TIMEZONE in self.h5.meta:
timezone = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.TIMEZONE
].mode()
timezone = timezone.values[0]
return timezone
@property
def offshore(self):
"""Get the SC point offshore flag based on the resource meta data
(if offshore column is present)."""
offshore = None
county_not_none = self.county is not None
if ResourceMetaField.OFFSHORE in self.h5.meta and county_not_none:
# make sure offshore flag and county are coincident
counties = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.COUNTY
].values
iloc = np.where(counties == self.county)[0][0]
offshore = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.OFFSHORE
].values
offshore = offshore[iloc]
elif ResourceMetaField.OFFSHORE in self.h5.meta:
offshore = self.h5.meta.loc[
self.h5_gid_set, ResourceMetaField.OFFSHORE
].mode()
offshore = offshore.values[0]
return offshore
@property
def gid_counts(self):
"""Get the sum of the inclusion values in each resource/generation gid
corresponding to this sc point. The sum of the gid counts can be less
than the value provided by n_gids if fractional exclusion/inclusions
are provided.
Returns
-------
gid_counts : list
"""
gid_counts = [
self.include_mask_flat[(self._h5_gids == gid)].sum()
for gid in self.h5_gid_set
]
return gid_counts
@property
def summary(self):
"""
Supply curve point's meta data summary
Returns
-------
pandas.Series
List of supply curve point's meta data
"""
meta = {
SupplyCurveField.SC_POINT_GID: self.sc_point_gid,
SupplyCurveField.SOURCE_GIDS: self.h5_gid_set,
SupplyCurveField.GID_COUNTS: self.gid_counts,
SupplyCurveField.N_GIDS: self.n_gids,
SupplyCurveField.AREA_SQ_KM: self.area,
SupplyCurveField.LATITUDE: self.latitude,
SupplyCurveField.LONGITUDE: self.longitude,
SupplyCurveField.COUNTRY: self.country,
SupplyCurveField.STATE: self.state,
SupplyCurveField.COUNTY: self.county,
SupplyCurveField.ELEVATION: self.elevation,
SupplyCurveField.TIMEZONE: self.timezone,
}
meta = pd.Series(meta)
return meta
[docs] @classmethod
def run(
cls,
gid,
excl,
agg_h5,
tm_dset,
*agg_dset,
agg_method="mean",
excl_dict=None,
inclusion_mask=None,
resolution=64,
excl_area=None,
exclusion_shape=None,
close=True,
gen_index=None,
):
"""
Compute exclusions weight mean for the sc point from data
Parameters
----------
gid : int
gid for supply curve point to analyze.
excl : str | ExclusionMask
Filepath to exclusions h5 or ExclusionMask file handler.
agg_h5 : str | Resource
Filepath to .h5 file to aggregate or Resource handler
tm_dset : str
Dataset name in the exclusions file containing the
exclusions-to-resource mapping data.
agg_dset : str
Dataset to aggreate, can supply multiple datasets or no datasets.
The datasets should be scalar values for each site. This method
cannot aggregate timeseries data.
agg_method : str
Aggregation method, either mean or sum/aggregate
excl_dict : dict | None
Dictionary of exclusion keyword arugments of the format
{layer_dset_name: {kwarg: value}} where layer_dset_name is a
dataset in the exclusion h5 file and kwarg is a keyword argument to
the reV.supply_curve.exclusions.LayerMask class.
None if excl input is pre-initialized.
inclusion_mask : np.ndarray
2D array pre-extracted inclusion mask where 1 is included and 0 is
excluded. The shape of this will be checked against the input
resolution.
resolution : int
Number of exclusion points per SC point along an axis.
This number**2 is the total number of exclusion points per
SC point.
excl_area : float | None, optional
Area of an exclusion pixel in km2. None will try to infer the area
from the profile transform attribute in excl_fpath, by default None
exclusion_shape : tuple
Shape of the full exclusions extent (rows, cols). Inputing this
will speed things up considerably.
close : bool
Flag to close object file handlers on exit.
gen_index : np.ndarray
Array of generation gids with array index equal to resource gid.
Array value is -1 if the resource index was not used in the
generation run.
Returns
-------
out : dict
Given datasets and meta data aggregated to supply curve points
"""
if isinstance(agg_dset, str):
agg_dset = (agg_dset,)
kwargs = {
"excl_dict": excl_dict,
"inclusion_mask": inclusion_mask,
"resolution": resolution,
"excl_area": excl_area,
"exclusion_shape": exclusion_shape,
"close": close,
"gen_index": gen_index,
}
with cls(gid, excl, agg_h5, tm_dset, **kwargs) as point:
if agg_method.lower().startswith("mean"):
agg_method = point.exclusion_weighted_mean
elif agg_method.lower().startswith(("sum", "agg")):
agg_method = point.aggregate
elif "wind_dir" in agg_method.lower():
agg_method = point.mean_wind_dirs
else:
msg = (
"Aggregation method must be either mean, "
"sum/aggregate, or wind_dir"
)
logger.error(msg)
raise ValueError(msg)
out = {"meta": point.summary}
for dset in agg_dset:
ds = point.h5.open_dataset(dset)
out[dset] = agg_method(ds)
return out
[docs]class GenerationSupplyCurvePoint(AggregationSupplyCurvePoint):
"""Supply curve point summary framework that ties a reV SC point to its
respective generation and resource data."""
# technology-dependent power density estimates in MW/km2
POWER_DENSITY = {"pv": 36, "wind": 3}
def __init__(
self,
gid,
excl,
gen,
tm_dset,
gen_index,
excl_dict=None,
inclusion_mask=None,
res_class_dset=None,
res_class_bin=None,
excl_area=None,
power_density=None,
cf_dset="cf_mean-means",
lcoe_dset="lcoe_fcr-means",
h5_dsets=None,
resolution=64,
exclusion_shape=None,
close=False,
friction_layer=None,
recalc_lcoe=True,
apply_exclusions=True,
):
"""
Parameters
----------
gid : int
gid for supply curve point to analyze.
excl : str | ExclusionMask
Filepath to exclusions h5 or ExclusionMask file handler.
gen : str | reV.handlers.Outputs
Filepath to .h5 reV generation output results or reV Outputs file
handler.
tm_dset : str
Dataset name in the techmap file containing the
exclusions-to-resource mapping data.
gen_index : np.ndarray
Array of generation gids with array index equal to resource gid.
Array value is -1 if the resource index was not used in the
generation run.
excl_dict : dict | None
Dictionary of exclusion keyword arugments of the format
{layer_dset_name: {kwarg: value}} where layer_dset_name is a
dataset in the exclusion h5 file and kwarg is a keyword argument to
the reV.supply_curve.exclusions.LayerMask class.
None if excl input is pre-initialized.
inclusion_mask : np.ndarray
2D array pre-extracted inclusion mask where 1 is included and 0 is
excluded. The shape of this will be checked against the input
resolution.
res_class_dset : str | np.ndarray | None
Dataset in the generation file dictating resource classes.
Can be pre-extracted resource data in np.ndarray.
None if no resource classes.
res_class_bin : list | None
Two-entry lists dictating the single resource class bin.
None if no resource classes.
excl_area : float | None, optional
Area of an exclusion pixel in km2. None will try to infer the area
from the profile transform attribute in excl_fpath, by default None
power_density : float | None | pd.DataFrame
Constant power density float, None, or opened dataframe with
(resource) "gid" and "power_density columns".
cf_dset : str | np.ndarray
Dataset name from gen containing capacity factor mean values.
This name is used to infer AC capacity factor dataset for
solar runs (i.e. the AC vsersion of "cf_mean-means" would
be inferred to be "cf_mean_ac-means"). This input can also
be pre-extracted generation output data in np.ndarray, in
which case all DC solar outputs are set to `None`.
lcoe_dset : str | np.ndarray
Dataset name from gen containing LCOE mean values.
Can be pre-extracted generation output data in np.ndarray.
h5_dsets : None | list | dict
Optional list of dataset names to summarize from the gen/econ h5
files. Can also be pre-extracted data dictionary where keys are
the dataset names and values are the arrays of data from the
h5 files.
resolution : int | None
SC resolution, must be input in combination with gid.
exclusion_shape : tuple
Shape of the exclusions extent (rows, cols). Inputing this will
speed things up considerably.
close : bool
Flag to close object file handlers on exit.
friction_layer : None | FrictionMask
Friction layer with scalar friction values if valid friction inputs
were entered. Otherwise, None to not apply friction layer.
recalc_lcoe : bool
Flag to re-calculate the LCOE from the multi-year mean capacity
factor and annual energy production data. This requires several
datasets to be aggregated in the gen input: system_capacity,
fixed_charge_rate, capital_cost, fixed_operating_cost,
and variable_operating_cost.
apply_exclusions : bool
Flag to apply exclusions to the resource / generation gid's on
initialization.
"""
self._res_class_dset = res_class_dset
self._res_class_bin = res_class_bin
self._cf_dset = cf_dset
self._lcoe_dset = lcoe_dset
self._h5_dsets = h5_dsets
self._mean_res = None
self._res_data = None
self._gen_data = None
self._lcoe_data = None
self._pd_obj = None
self._power_density = self._power_density_ac = power_density
self._friction_layer = friction_layer
self._recalc_lcoe = recalc_lcoe
self._ssc = None
self._slk = {}
super().__init__(
gid,
excl,
gen,
tm_dset,
excl_dict=excl_dict,
inclusion_mask=inclusion_mask,
resolution=resolution,
excl_area=excl_area,
exclusion_shape=exclusion_shape,
close=close,
apply_exclusions=False,
)
self._res_gid_set = None
self._gen_gid_set = None
self._gen_fpath, self._gen = self._h5_fpath, self._h5
self._gen_gids, self._res_gids = self._map_gen_gids(
self._gids, gen_index
)
self._gids = self._gen_gids
if (self._gen_gids != -1).sum() == 0:
emsg = (
"Supply curve point gid {} has no viable exclusion "
'points based on exclusions file: "{}"'.format(
self._gid, self._excl_fpath
)
)
raise EmptySupplyCurvePointError(emsg)
if apply_exclusions:
self._apply_exclusions()
[docs] def exclusion_weighted_mean(self, flat_arr):
"""Calc the exclusions-weighted mean value of a flat array of gen data.
Parameters
----------
flat_arr : np.ndarray
Flattened array of resource/generation/econ data. Must be
index-able with the self._gen_gids array (must be a 1D array with
an entry for every site in the generation extent).
Returns
-------
mean : float
Mean of flat_arr masked by the binary exclusions then weighted by
the non-zero exclusions.
"""
x = flat_arr[self._gen_gids[self.bool_mask]].astype("float32")
incl = self.include_mask_flat[self.bool_mask]
x *= incl
mean = x.sum() / incl.sum()
return mean
@property
def gen(self):
"""Get the generation output object.
Returns
-------
_gen : Resource
reV generation Resource object
"""
if self._gen is None:
self._gen = Resource(self._gen_fpath, str_decode=False)
return self._gen
@property
def res_gid_set(self):
"""Get list of unique resource gids corresponding to this sc point.
Returns
-------
res_gids : list
List of resource gids.
"""
if self._res_gid_set is None:
self._res_gid_set = self._ordered_unique(self._res_gids)
if -1 in self._res_gid_set:
self._res_gid_set.remove(-1)
return self._res_gid_set
@property
def gen_gid_set(self):
"""Get list of unique generation gids corresponding to this sc point.
Returns
-------
gen_gids : list
List of generation gids.
"""
if self._gen_gid_set is None:
self._gen_gid_set = self._ordered_unique(self._gen_gids)
if -1 in self._gen_gid_set:
self._gen_gid_set.remove(-1)
return self._gen_gid_set
@property
def h5_gid_set(self):
"""Get list of unique h5 gids corresponding to this sc point.
Same as gen_gid_set
Returns
-------
h5_gids : list
List of h5 gids.
"""
return self.gen_gid_set
@property
def gid_counts(self):
"""Get the number of exclusion pixels in each resource/generation gid
corresponding to this sc point.
Returns
-------
gid_counts : list
List of exclusion pixels in each resource/generation gid.
"""
gid_counts = [
self.include_mask_flat[(self._res_gids == gid)].sum()
for gid in self.res_gid_set
]
return gid_counts
@property
def res_data(self):
"""Get the resource data array.
Returns
-------
_res_data : np.ndarray
Multi-year-mean resource data array for all sites in the
generation data output file.
"""
if isinstance(self._res_class_dset, np.ndarray):
return self._res_class_dset
if self._res_data is None:
if self._res_class_dset in self.gen.datasets:
self._res_data = self.gen[self._res_class_dset]
return self._res_data
@property
def gen_data(self):
"""Get the generation capacity factor data array.
Returns
-------
_gen_data : np.ndarray
Multi-year-mean capacity factor data array for all sites in the
generation data output file.
"""
if isinstance(self._cf_dset, np.ndarray):
return self._cf_dset
if self._gen_data is None:
if self._cf_dset in self.gen.datasets:
self._gen_data = self.gen[self._cf_dset]
return self._gen_data
@property
def gen_ac_data(self):
"""Get the generation ac capacity factor data array.
This output is only not `None` for solar runs where `cf_dset`
was specified as a string.
Returns
-------
gen_ac_data : np.ndarray | None
Multi-year-mean ac capacity factor data array for all sites
in the generation data output file or `None` if none
detected.
"""
if isinstance(self._cf_dset, np.ndarray):
return None
ac_cf_dset = _infer_cf_dset_ac(self._cf_dset)
if ac_cf_dset in self.gen.datasets:
return self.gen[ac_cf_dset]
return None
@property
def lcoe_data(self):
"""Get the LCOE data array.
Returns
-------
_lcoe_data : np.ndarray
Multi-year-mean LCOE data array for all sites in the
generation data output file.
"""
if isinstance(self._lcoe_dset, np.ndarray):
return self._lcoe_dset
if self._lcoe_data is None:
if self._lcoe_dset in self.gen.datasets:
self._lcoe_data = self.gen[self._lcoe_dset]
return self._lcoe_data
@property
def mean_cf(self):
"""Get the mean capacity factor for the non-excluded data. Capacity
factor is weighted by the exclusions (usually 0 or 1, but 0.5
exclusions will weight appropriately).
This value represents DC capacity factor for solar and AC
capacity factor for all other technologies. This is the capacity
factor that should be used for all cost calculations for ALL
technologies (to align with SAM).
Returns
-------
mean_cf : float | None
Mean capacity factor value for the non-excluded data.
"""
mean_cf = None
if self.gen_data is not None:
mean_cf = self.exclusion_weighted_mean(self.gen_data)
return mean_cf
@property
def mean_cf_ac(self):
"""Get the mean AC capacity factor for the non-excluded data.
This output is only not `None` for solar runs.
Capacity factor is weighted by the exclusions (usually 0 or 1,
but 0.5 exclusions will weight appropriately).
Returns
-------
mean_cf_ac : float | None
Mean capacity factor value for the non-excluded data.
"""
mean_cf_ac = None
if self.gen_ac_data is not None:
mean_cf_ac = self.exclusion_weighted_mean(self.gen_ac_data)
return mean_cf_ac
@property
def mean_cf_dc(self):
"""Get the mean DC capacity factor for the non-excluded data.
This output is only not `None` for solar runs.
Capacity factor is weighted by the exclusions (usually 0 or 1,
but 0.5 exclusions will weight appropriately).
Returns
-------
mean_cf_dc : float | None
Mean capacity factor value for the non-excluded data.
"""
if self.mean_cf_ac is not None:
return self.mean_cf
return None
@property
def mean_lcoe(self):
"""Get the mean LCOE for the non-excluded data.
Returns
-------
mean_lcoe : float | None
Mean LCOE value for the non-excluded data.
"""
mean_lcoe = None
# prioritize the calculation of lcoe explicitly from the multi year
# mean CF (the lcoe re-calc will still happen if mean_cf is a single
# year CF, but the output should be identical to the original LCOE and
# so is not consequential).
if self._recalc_lcoe:
required = ("fixed_charge_rate", "capital_cost",
"fixed_operating_cost", "variable_operating_cost",
"system_capacity")
if all(self._sam_lcoe_kwargs.get(k) is not None for k in required):
aep = (
self._sam_lcoe_kwargs["system_capacity"]
* self.mean_cf
* 8760
)
# Note the AEP computation uses the SAM config
# `system_capacity`, so no need to scale `capital_cost`
# or `fixed_operating_cost` by anything
mean_lcoe = lcoe_fcr(
self._sam_lcoe_kwargs["fixed_charge_rate"],
self._sam_lcoe_kwargs["capital_cost"],
self._sam_lcoe_kwargs["fixed_operating_cost"],
aep,
self._sam_lcoe_kwargs["variable_operating_cost"],
)
# alternative if lcoe was not able to be re-calculated from
# multi year mean CF
if mean_lcoe is None and self.lcoe_data is not None:
mean_lcoe = self.exclusion_weighted_mean(self.lcoe_data)
return mean_lcoe
@property
def mean_res(self):
"""Get the mean resource for the non-excluded data.
Returns
-------
mean_res : float | None
Mean resource for the non-excluded data.
"""
mean_res = None
if self._res_class_dset is not None:
mean_res = self.exclusion_weighted_mean(self.res_data)
return mean_res
@property
def mean_lcoe_friction(self):
"""Get the mean LCOE for the non-excluded data, multiplied by the
mean_friction scalar value.
Returns
-------
mean_lcoe_friction : float | None
Mean LCOE value for the non-excluded data multiplied by the
mean friction scalar value.
"""
mean_lcoe_friction = None
if self.mean_lcoe is not None and self.mean_friction is not None:
mean_lcoe_friction = self.mean_lcoe * self.mean_friction
return mean_lcoe_friction
@property
def mean_friction(self):
"""Get the mean friction scalar for the non-excluded data.
Returns
-------
friction : None | float
Mean value of the friction data layer for the non-excluded data.
If friction layer is not input to this class, None is returned.
"""
friction = None
if self._friction_layer is not None:
friction = self.friction_data.flatten()[self.bool_mask].mean()
return friction
@property
def friction_data(self):
"""Get the friction data for the full SC point (no exclusions)
Returns
-------
friction_data : None | np.ndarray
2D friction data layer corresponding to the exclusions grid in
the SC domain. If friction layer is not input to this class,
None is returned.
"""
friction_data = None
if self._friction_layer is not None:
friction_data = self._friction_layer[self.rows, self.cols]
return friction_data
@property
def power_density(self):
"""Get the estimated power density either from input or infered from
generation output meta.
Returns
-------
_power_density : float
Estimated power density in MW/km2
"""
if self._power_density is None:
tech = self.gen.meta["reV_tech"][0]
if tech in self.POWER_DENSITY:
self._power_density = self.POWER_DENSITY[tech]
else:
warn(
"Could not recognize reV technology in generation meta "
'data: "{}". Cannot lookup an appropriate power density '
"to calculate SC point capacity.".format(tech)
)
elif isinstance(self._power_density, pd.DataFrame):
self._pd_obj = self._power_density
missing = set(self.res_gid_set) - set(self._pd_obj.index.values)
if any(missing):
msg = (
"Variable power density input is missing the "
"following resource GIDs: {}".format(missing)
)
logger.error(msg)
raise FileInputError(msg)
pds = self._pd_obj.loc[
self._res_gids[self.bool_mask], "power_density"
].values
pds = pds.astype(np.float32)
pds *= self.include_mask_flat[self.bool_mask]
denom = self.include_mask_flat[self.bool_mask].sum()
self._power_density = pds.sum() / denom
return self._power_density
@property
def power_density_ac(self):
"""Get the estimated AC power density either from input or
inferred from generation output meta.
This value is only available for solar runs with a "dc_ac_ratio"
dataset in the generation file. If these conditions are not met,
this value is `None`.
Returns
-------
_power_density_ac : float | None
Estimated AC power density in MW/km2
"""
if "dc_ac_ratio" not in self.gen.datasets:
return None
ilr = self.gen["dc_ac_ratio", self._gen_gids[self.bool_mask]]
ilr = ilr.astype("float32")
weights = self.include_mask_flat[self.bool_mask]
if self._power_density_ac is None:
tech = self.gen.meta["reV_tech"][0]
if tech in self.POWER_DENSITY:
power_density_ac = self.POWER_DENSITY[tech] / ilr
power_density_ac *= weights
power_density_ac = power_density_ac.sum() / weights.sum()
else:
warn(
"Could not recognize reV technology in generation meta "
'data: "{}". Cannot lookup an appropriate power density '
"to calculate SC point capacity.".format(tech)
)
power_density_ac = None
elif isinstance(self._power_density_ac, pd.DataFrame):
self._pd_obj = self._power_density_ac
missing = set(self.res_gid_set) - set(self._pd_obj.index.values)
if any(missing):
msg = (
"Variable power density input is missing the "
"following resource GIDs: {}".format(missing)
)
logger.error(msg)
raise FileInputError(msg)
pds = self._pd_obj.loc[
self._res_gids[self.bool_mask], "power_density"
].values
power_density_ac = pds.astype(np.float32) / ilr
power_density_ac *= weights
power_density_ac = power_density_ac.sum() / weights.sum()
else:
power_density_ac = self._power_density_ac * weights / ilr
power_density_ac = power_density_ac.sum() / weights.sum()
return power_density_ac
@property
def capacity(self):
"""Get the estimated capacity in MW of the supply curve point in the
current resource class with the applied exclusions.
This value represents DC capacity for solar and AC capacity for
all other technologies. This is the capacity that should be used
for all cost calculations for ALL technologies (to align with
SAM).
Returns
-------
capacity : float
Estimated capacity in MW of the supply curve point in the
current resource class with the applied exclusions.
"""
capacity = None
if self.power_density is not None:
capacity = self.area * self.power_density
return capacity
@property
def capacity_ac(self):
"""Get the AC estimated capacity in MW of the supply curve point in the
current resource class with the applied exclusions.
This value is provided only for solar inputs that have
the "dc_ac_ratio" dataset in the generation file. If these
conditions are not met, this value is `None`.
Returns
-------
capacity : float | None
Estimated AC capacity in MW of the supply curve point in the
current resource class with the applied exclusions. Only not
`None` for solar runs with "dc_ac_ratio" dataset in the
generation file
"""
if self.power_density_ac is None:
return None
return self.area * self.power_density_ac
@property
def capacity_dc(self):
"""Get the DC estimated capacity in MW of the supply curve point
in the current resource class with the applied exclusions.
This value is provided only for solar inputs that have
the "dc_ac_ratio" dataset in the generation file. If these
conditions are not met, this value is `None`.
Returns
-------
capacity : float | None
Estimated AC capacity in MW of the supply curve point in the
current resource class with the applied exclusions. Only not
`None` for solar runs with "dc_ac_ratio" dataset in the
generation file
"""
if self.power_density_ac is None:
return None
return self.area * self.power_density
@property
def sc_point_annual_energy(self):
"""Get the total annual energy (MWh) for the entire SC point.
This value is computed using the capacity of the supply curve
point as well as the mean capacity factor. If the mean capacity
factor is `None`, this value will also be `None`.
Returns
-------
sc_point_annual_energy : float | None
Total annual energy (MWh) for the entire SC point.
"""
if self.mean_cf is None:
return None
return self.mean_cf * self.capacity * 8760
@property
def h5_dsets_data(self):
"""Get any additional/supplemental h5 dataset data to summarize.
Returns
-------
h5_dsets_data : dict | None
"""
_h5_dsets_data = None
if isinstance(self._h5_dsets, (list, tuple)):
_h5_dsets_data = {}
for dset in self._h5_dsets:
if dset in self.gen.datasets:
_h5_dsets_data[dset] = self.gen[dset]
elif isinstance(self._h5_dsets, dict):
_h5_dsets_data = self._h5_dsets
elif self._h5_dsets is not None:
e = (
"Cannot recognize h5_dsets input type, should be None, "
"a list of dataset names, or a dictionary or "
"pre-extracted data. Received: {} {}".format(
type(self._h5_dsets), self._h5_dsets
)
)
logger.error(e)
raise TypeError(e)
return _h5_dsets_data
@property
def regional_multiplier(self):
"""float: Mean regional capital cost multiplier, defaults to 1."""
if "capital_cost_multiplier" not in self.gen.datasets:
return 1
multipliers = self.gen["capital_cost_multiplier"]
return self.exclusion_weighted_mean(multipliers)
@property
def fixed_charge_rate(self):
"""float: Mean fixed_charge_rate, defaults to 0."""
if "fixed_charge_rate" not in self.gen.datasets:
return 0
return self.exclusion_weighted_mean(self.gen["fixed_charge_rate"])
@property
def _sam_system_capacity_kw(self):
"""float: Mean SAM generation system capacity input, defaults to 0. """
if self._ssc is not None:
return self._ssc
self._ssc = 0
if "system_capacity" in self.gen.datasets:
self._ssc = self.exclusion_weighted_mean(
self.gen["system_capacity"]
)
return self._ssc
@property
def _sam_lcoe_kwargs(self):
"""dict: Mean LCOE inputs, as passed to SAM during generation."""
if self._slk:
return self._slk
self._slk = {"capital_cost": None, "fixed_operating_cost": None,
"variable_operating_cost": None,
"fixed_charge_rate": None, "system_capacity": None}
for dset in self._slk:
if dset in self.gen.datasets:
self._slk[dset] = self.exclusion_weighted_mean(
self.gen[dset]
)
return self._slk
def _compute_cost_per_ac_mw(self, dset):
"""Compute a cost per AC MW for a given input. """
if self._sam_system_capacity_kw <= 0:
return None
if dset not in self.gen.datasets:
return None
sam_cost = self.exclusion_weighted_mean(self.gen[dset])
sam_cost_per_mw = sam_cost / (self._sam_system_capacity_kw / 1000)
sc_point_cost = sam_cost_per_mw * self.capacity
ac_cap = (self.capacity
if self.capacity_ac is None
else self.capacity_ac)
return sc_point_cost / ac_cap
@property
def mean_h5_dsets_data(self):
"""Get the mean supplemental h5 datasets data (optional)
Returns
-------
mean_h5_dsets_data : dict | None
Mean dataset values for the non-excluded data for the optional
h5_dsets input.
"""
_mean_h5_dsets_data = None
if self.h5_dsets_data is not None:
_mean_h5_dsets_data = {}
for dset, arr in self.h5_dsets_data.items():
_mean_h5_dsets_data[dset] = self.exclusion_weighted_mean(arr)
return _mean_h5_dsets_data
def _apply_exclusions(self):
"""Apply exclusions by masking the generation and resource gid arrays.
This removes all res/gen entries that are masked by the exclusions or
resource bin."""
# exclusions mask is False where excluded
exclude = self.include_mask_flat == 0
exclude = self._resource_exclusion(exclude)
self._gen_gids[exclude] = -1
self._res_gids[exclude] = -1
# ensure that excluded pixels (including resource exclusions!)
# has an exclusions multiplier of 0
exclude = exclude.reshape(self.include_mask.shape)
self._incl_mask[exclude] = 0.0
self._incl_mask = self._incl_mask.flatten()
if (self._gen_gids != -1).sum() == 0:
msg = (
"Supply curve point gid {} is completely excluded for res "
"bin: {}".format(self._gid, self._res_class_bin)
)
raise EmptySupplyCurvePointError(msg)
def _resource_exclusion(self, boolean_exclude):
"""Include the resource exclusion into a pre-existing bool exclusion.
Parameters
----------
boolean_exclude : np.ndarray
Boolean exclusion array (True is exclude).
Returns
-------
boolean_exclude : np.ndarray
Same as input but includes additional exclusions for resource
outside of current resource class bin.
"""
if (
self._res_class_dset is not None
and self._res_class_bin is not None
):
rex = self.res_data[self._gen_gids]
rex = (rex < np.min(self._res_class_bin)) | (
rex >= np.max(self._res_class_bin)
)
boolean_exclude = boolean_exclude | rex
return boolean_exclude
[docs] def point_summary(self, args=None):
"""
Get a summary dictionary of a single supply curve point.
Parameters
----------
args : tuple | list | None
List of summary arguments to include. None defaults to all
available args defined in the class attr.
Returns
-------
summary : dict
Dictionary of summary outputs for this sc point.
"""
ARGS = {
SupplyCurveField.LATITUDE: self.latitude,
SupplyCurveField.LONGITUDE: self.longitude,
SupplyCurveField.COUNTRY: self.country,
SupplyCurveField.STATE: self.state,
SupplyCurveField.COUNTY: self.county,
SupplyCurveField.ELEVATION: self.elevation,
SupplyCurveField.TIMEZONE: self.timezone,
SupplyCurveField.SC_POINT_GID: self.sc_point_gid,
SupplyCurveField.SC_ROW_IND: self.sc_row_ind,
SupplyCurveField.SC_COL_IND: self.sc_col_ind,
SupplyCurveField.RES_GIDS: self.res_gid_set,
SupplyCurveField.GEN_GIDS: self.gen_gid_set,
SupplyCurveField.GID_COUNTS: self.gid_counts,
SupplyCurveField.N_GIDS: self.n_gids,
SupplyCurveField.OFFSHORE: self.offshore,
SupplyCurveField.MEAN_CF_AC: (
self.mean_cf if self.mean_cf_ac is None else self.mean_cf_ac
),
SupplyCurveField.MEAN_CF_DC: self.mean_cf_dc,
SupplyCurveField.MEAN_LCOE: self.mean_lcoe,
SupplyCurveField.MEAN_RES: self.mean_res,
SupplyCurveField.AREA_SQ_KM: self.area,
SupplyCurveField.CAPACITY_AC_MW: (
self.capacity if self.capacity_ac is None else self.capacity_ac
),
SupplyCurveField.CAPACITY_DC_MW: self.capacity_dc,
SupplyCurveField.EOS_MULT: 1, # added later
SupplyCurveField.REG_MULT: self.regional_multiplier,
SupplyCurveField.SC_POINT_ANNUAL_ENERGY_MWH: (
self.sc_point_annual_energy
),
SupplyCurveField.COST_SITE_OCC_USD_PER_AC_MW: (
self._compute_cost_per_ac_mw("capital_cost")
),
SupplyCurveField.COST_BASE_OCC_USD_PER_AC_MW: (
self._compute_cost_per_ac_mw("base_capital_cost")
),
SupplyCurveField.COST_SITE_FOC_USD_PER_AC_MW: (
self._compute_cost_per_ac_mw("fixed_operating_cost")
),
SupplyCurveField.COST_BASE_FOC_USD_PER_AC_MW: (
self._compute_cost_per_ac_mw("base_fixed_operating_cost")
),
SupplyCurveField.COST_SITE_VOC_USD_PER_AC_MW: (
self._compute_cost_per_ac_mw("variable_operating_cost")
),
SupplyCurveField.COST_BASE_VOC_USD_PER_AC_MW: (
self._compute_cost_per_ac_mw("base_variable_operating_cost")
),
SupplyCurveField.FIXED_CHARGE_RATE: self.fixed_charge_rate,
}
if self._friction_layer is not None:
ARGS[SupplyCurveField.MEAN_FRICTION] = self.mean_friction
ARGS[SupplyCurveField.MEAN_LCOE_FRICTION] = self.mean_lcoe_friction
if self._h5_dsets is not None:
for dset, data in self.mean_h5_dsets_data.items():
ARGS["mean_{}".format(dset)] = data
if args is None:
args = list(ARGS.keys())
summary = {}
for arg in args:
if arg in ARGS:
summary[arg] = ARGS[arg]
else:
warn(
'Cannot find "{}" as an available SC self summary '
"output",
OutputWarning,
)
return summary
[docs] @staticmethod
def economies_of_scale(cap_cost_scale, summary):
"""Apply economies of scale to this point summary
Parameters
----------
cap_cost_scale : str
LCOE scaling equation to implement "economies of scale".
Equation must be in python string format and return a scalar
value to multiply the capital cost by. Independent variables in
the equation should match the names of the columns in the reV
supply curve aggregation table.
summary : dict
Dictionary of summary outputs for this sc point.
Returns
-------
summary : dict
Dictionary of summary outputs for this sc point.
"""
eos = EconomiesOfScale(cap_cost_scale, summary)
summary[SupplyCurveField.RAW_LCOE] = eos.raw_lcoe
summary[SupplyCurveField.MEAN_LCOE] = eos.scaled_lcoe
summary[SupplyCurveField.EOS_MULT] = eos.capital_cost_scalar
cost = summary[SupplyCurveField.COST_SITE_OCC_USD_PER_AC_MW]
if cost is not None:
summary[SupplyCurveField.COST_SITE_OCC_USD_PER_AC_MW] = (
cost * summary[SupplyCurveField.EOS_MULT]
)
return summary
[docs] @classmethod
def summarize(
cls,
gid,
excl_fpath,
gen_fpath,
tm_dset,
gen_index,
excl_dict=None,
inclusion_mask=None,
res_class_dset=None,
res_class_bin=None,
excl_area=None,
power_density=None,
cf_dset="cf_mean-means",
lcoe_dset="lcoe_fcr-means",
h5_dsets=None,
resolution=64,
exclusion_shape=None,
close=False,
friction_layer=None,
args=None,
data_layers=None,
cap_cost_scale=None,
recalc_lcoe=True,
):
"""Get a summary dictionary of a single supply curve point.
Parameters
----------
gid : int
gid for supply curve point to analyze.
excl_fpath : str
Filepath to exclusions h5.
gen_fpath : str
Filepath to .h5 reV generation output results.
tm_dset : str
Dataset name in the techmap file containing the
exclusions-to-resource mapping data.
gen_index : np.ndarray
Array of generation gids with array index equal to resource gid.
Array value is -1 if the resource index was not used in the
generation run.
excl_dict : dict | None
Dictionary of exclusion keyword arugments of the format
{layer_dset_name: {kwarg: value}} where layer_dset_name is a
dataset in the exclusion h5 file and kwarg is a keyword argument to
the reV.supply_curve.exclusions.LayerMask class.
None if excl input is pre-initialized.
inclusion_mask : np.ndarray
2D array pre-extracted inclusion mask where 1 is included and 0 is
excluded. The shape of this will be checked against the input
resolution.
res_class_dset : str | np.ndarray | None
Dataset in the generation file dictating resource classes.
Can be pre-extracted resource data in np.ndarray.
None if no resource classes.
res_class_bin : list | None
Two-entry lists dictating the single resource class bin.
None if no resource classes.
excl_area : float | None, optional
Area of an exclusion pixel in km2. None will try to infer the area
from the profile transform attribute in excl_fpath, by default None
power_density : float | None | pd.DataFrame
Constant power density float, None, or opened dataframe with
(resource) "gid" and "power_density columns".
cf_dset : str | np.ndarray
Dataset name from gen containing capacity factor mean values.
Can be pre-extracted generation output data in np.ndarray.
lcoe_dset : str | np.ndarray
Dataset name from gen containing LCOE mean values.
Can be pre-extracted generation output data in np.ndarray.
h5_dsets : None | list | dict
Optional list of dataset names to summarize from the gen/econ h5
files. Can also be pre-extracted data dictionary where keys are
the dataset names and values are the arrays of data from the
h5 files.
resolution : int | None
SC resolution, must be input in combination with gid.
exclusion_shape : tuple
Shape of the exclusions extent (rows, cols). Inputing this will
speed things up considerably.
close : bool
Flag to close object file handlers on exit.
friction_layer : None | FrictionMask
Friction layer with scalar friction values if valid friction inputs
were entered. Otherwise, None to not apply friction layer.
args : tuple | list, optional
List of summary arguments to include. None defaults to all
available args defined in the class attr, by default None
data_layers : dict, optional
Aggregation data layers. Must be a dictionary keyed by data label
name. Each value must be another dictionary with "dset", "method",
and "fpath", by default None
cap_cost_scale : str | None
Optional LCOE scaling equation to implement "economies of scale".
Equations must be in python string format and return a scalar
value to multiply the capital cost by. Independent variables in
the equation should match the names of the columns in the reV
supply curve aggregation table.
recalc_lcoe : bool
Flag to re-calculate the LCOE from the multi-year mean capacity
factor and annual energy production data. This requires several
datasets to be aggregated in the gen input: system_capacity,
fixed_charge_rate, capital_cost, fixed_operating_cost,
and variable_operating_cost.
Returns
-------
summary : dict
Dictionary of summary outputs for this sc point.
"""
kwargs = {
"excl_dict": excl_dict,
"inclusion_mask": inclusion_mask,
"res_class_dset": res_class_dset,
"res_class_bin": res_class_bin,
"excl_area": excl_area,
"power_density": power_density,
"cf_dset": cf_dset,
"lcoe_dset": lcoe_dset,
"h5_dsets": h5_dsets,
"resolution": resolution,
"exclusion_shape": exclusion_shape,
"close": close,
"friction_layer": friction_layer,
"recalc_lcoe": recalc_lcoe,
}
with cls(
gid, excl_fpath, gen_fpath, tm_dset, gen_index, **kwargs
) as point:
summary = point.point_summary(args=args)
if data_layers is not None:
summary = point.agg_data_layers(summary, data_layers)
if cap_cost_scale is not None:
summary = point.economies_of_scale(cap_cost_scale, summary)
for arg, val in summary.items():
if val is None:
summary[arg] = np.nan
return summary
def _infer_cf_dset_ac(cf_dset):
"""Infer AC dataset name from input. """
parts = cf_dset.split("-")
if len(parts) == 1:
return f"{cf_dset}_ac"
cf_name = "-".join(parts[:-1])
return f"{cf_name}_ac-{parts[-1]}"