# -*- coding: utf-8 -*-
"""
Created on Wed Aug 21 13:47:43 2019
@author: gbuster
"""
from abc import ABC
from collections import Counter
import datetime
import pytz
import copy
import json
import logging
import numpy as np
import pandas as pd
from scipy.spatial import cKDTree
from reVX.handlers.outputs import Outputs
from reVX.plexos.utilities import DataCleaner, get_coord_labels
logger = logging.getLogger(__name__)
TZ_ALIASES = {'UTC': 'utc',
'Universal': 'utc',
'US/Pacific': 'pst',
'US/Mountain': 'mst',
'US/Central': 'cst',
'US/Eastern': 'est',
}
[docs]class PlexosNode:
"""Framework to build a gen profile at a single plexos node. The plexos
node is defined as a power bus or some sort of plant that is intended to
represent the aggregated power generation profile from one or more reV
supply curve points that were mapped to the plexos node. Built capacity
needs to be defined for each supply curve point that contributes to this
node. Resource within each supply curve point is built in order of cf_mean.
"""
def __init__(self, sc_build, cf_fpath, res_gids=None,
force_full_build=False, forecast_fpath=None,
forecast_map=None, dset_tag=None):
"""
Parameters
----------
sc_build : pd.DataFrame
Supply curve buildout table. Must only have rows that are built
in this plexos node. Must have res_gids, gid_counts, gid_capacity,
and built_capacity at each SC point. Note that the gen_gids column
in the rev_sc is ignored and only the res_gids from rev_sc are
mapped to the corresponding "gid" column in the cf_fpath meta data.
cf_fpath : str
File path to capacity factor file (reV gen output) to
get profiles from.
res_gids : list | np.ndarray, optional
Resource GID's available in cf_fpath, if None pull from cf_fpath,
by default None
force_full_build : bool
Flag to ensure the full requested buildout is built at each SC
point. If True, the remainder of the requested build will always
be built at the last resource gid in the sc point.
forecast_fpath : str | None, optional
Forecasted capacity factor .h5 file path (reV results). If not
None, the generation profiles are sourced from this file, by
default None
forecast_map : np.ndarray | None, optional
(n, 1) array of forecast meta data indices mapped to the generation
meta indices where n is the number of generation points. None if no
forecast data being considered, by default None
dset_tag : str
Dataset tag to append to dataset names in cf profile file. e.g. If
the cf profile file is a multi year file using dset_tag="-2008"
will enable us to select the corresponding datasets
(cf_mean-2008, cf_profile-2008, etc)
"""
self._sc_build = \
DataCleaner.rename_cols(sc_build,
name_map=DataCleaner.REV_NAME_MAP)
self._cf_fpath = cf_fpath
if res_gids is None:
res_gids = self._get_res_gids(cf_fpath)
self._res_gids = res_gids
self._forecast_fpath = forecast_fpath
self._forecast_map = forecast_map
self._force_full_build = force_full_build
self._dset_tag = dset_tag if dset_tag is not None else ""
@staticmethod
def _get_res_gids(cf_fpath):
"""
Get available res_gids from cf .h5 file
Parameters
----------
cf_fpath : str
File path to capacity factor file (reV gen output) to
get profiles from.
Returns
-------
res_gids : ndarray
Array of resource GIDs available in the cf file
"""
with Outputs(cf_fpath, mode='r') as cf_outs:
res_gids = cf_outs.get_meta_arr('gid')
if not isinstance(res_gids, np.ndarray):
res_gids = np.array(list(res_gids))
return res_gids
def _get_sc_point_meta(self, row_idx):
"""Get a meta df for a single SC point at a given row index, which
is part of this plexos node. Each row in the sc point meta data
represents a resource pixel belonging to the sc point.
Parameters
----------
row_idx : int
Index value for the row of the target SC point in self._sc_build.
Returns
-------
sc_gid : int
Supply curve point gid for this sc point.
sc_meta : pd.DataFrame
Dataframe with rows corresponding to resource/generation pixels
that are part of this SC point. Sorted by cf_mean with best
cf_mean at top.
buildout : float
Total REEDS requested buildout associated with SC point i.
"""
sc_point = self._sc_build.loc[row_idx]
sc_gid, res_gids, gen_gids, gid_counts, gid_capacity, buildout, _ = \
self._parse_sc_point(sc_point, self._res_gids)
sc_meta = pd.DataFrame({'gen_gid': gen_gids,
'res_gid': res_gids,
'gid_count': gid_counts,
'gid_capacity': gid_capacity})
sc_meta = sc_meta.sort_values(by='gen_gid')
sc_meta = sc_meta[(sc_meta.gid_capacity > 0)]
with Outputs(self._cf_fpath, mode='r') as cf_outs:
gen_gids = list(sc_meta['gen_gid'].values)
gen_gids = [a for b in gen_gids for a in b]
cf_mean = cf_outs['cf_mean' + self._dset_tag,
list(sc_meta['gen_gid'].values)]
sc_meta['cf_mean' + self._dset_tag] = cf_mean
sort_by = ['cf_mean' + self._dset_tag, 'gid_capacity', 'res_gid']
sc_meta = sc_meta.sort_values(by=sort_by,
ascending=False)
sc_meta = sc_meta.reset_index(drop=True)
# infinite capacity in the last gid to make sure full buildout is done
if self._force_full_build:
sc_meta.loc[sc_meta.index[-1], 'gid_capacity'] = 1e6
return sc_gid, sc_meta, buildout
@staticmethod
def _parse_sc_point(sc_point, all_res_gids):
"""Parse data from sc point.
Parameters
----------
sc_point : pd.Series
Single row in the reV supply curve table to parse. Must have
res_gids, gid_counts, gid_capacity, and built_capacity at each SC
point. Note that the gen_gids column in the rev_sc is ignored and
only the res_gids from rev_sc are mapped to the corresponding "gid"
column in the cf_fpath meta data.
all_res_gids : list | np.ndarray
ALL resource GID's available in cf_fpath
Returns
-------
sc_gid : int
Supply curve point gid for this sc point.
res_gids : list
Resource GIDs associated with SC point i.
gen_gids : list
Generation (reV gen output) GIDs associated with SC point i. This
is parsed from the res_gids column in the sc_point and the location
of the res_gids in the all_res_gids array which is from the
cf_fpath input. This is not taken directly from the gen_gids column
gid_counts : list
Number of exclusion pixels that are included associated
with each res_gid.
gid_capacity : list
Available capacity in MW for each res_gid.
buildout : float
Total REEDS requested buildout associated with SC point i.
capacity : float
Total Supply Curve Point Capacity
"""
sc_gid = int(sc_point['sc_gid'])
buildout = float(sc_point['built_capacity'])
capacity = float(sc_point['potential_capacity'])
res_gids = sc_point['res_gids']
gid_counts = sc_point['gid_counts']
if isinstance(res_gids, str):
res_gids = json.loads(res_gids)
if isinstance(gid_counts, str):
gid_counts = json.loads(gid_counts)
if 'gid_capacity' in sc_point:
gid_capacity = sc_point['gid_capacity']
if isinstance(gid_capacity, str):
gid_capacity = json.loads(gid_capacity)
else:
gid_capacity = gid_counts / np.sum(gid_counts) * capacity
gen_gids = [np.where(all_res_gids == g)[0] for g in res_gids]
if not any(gen_gids):
msg = ('Could not find the following resource gids in the '
'cf file input: {}'.format(res_gids))
logger.error(msg)
raise RuntimeError(msg)
return (sc_gid, res_gids, gen_gids, gid_counts, gid_capacity,
buildout, capacity)
def _build_sc_point_profile(self, row_idx, profile):
"""Build a power generation profile based on a
single supply curve point.
Parameters
----------
row_idx : int
Index value for the row of the target SC point in self._sc_build.
profile : np.ndarray | None
(t,) array of generation in MW, or None if this is the first
SC point to add generation.
Returns
----------
profile : np.ndarray
(t,) array of generation in MW where t is the timeindex length.
sc_gids : list
List of supply curve point GID's that were build for this point
(really just a list with one integer).
res_gids : list
List of resource GID's that were built from this SC point.
gen_gids : list
List of generation GID's that were built from this SC point.
res_built : list
List of built capacities at each resource GID from this SC point.
"""
sc_gid, sc_meta, buildout = self._get_sc_point_meta(row_idx)
full_buildout = copy.deepcopy(buildout)
sc_gids = [sc_gid]
res_gids = []
gen_gids = []
res_built = []
for _, row in sc_meta.iterrows():
if buildout > 0 and row['gid_capacity'] > 0:
if buildout <= row['gid_capacity']:
to_build = buildout
else:
to_build = row['gid_capacity']
buildout -= to_build
res_built.append(np.round(to_build, decimals=5))
gen_gid = int(row['gen_gid'])
if self._forecast_map is None:
with Outputs(self._cf_fpath, mode='r') as cf_outs:
cf_profile = cf_outs['cf_profile' + self._dset_tag, :,
gen_gid]
else:
gen_gid = int(self._forecast_map[gen_gid])
with Outputs(self._forecast_fpath, mode='r') as cf_outs:
cf_profile = cf_outs['cf_profile' + self._dset_tag, :,
gen_gid]
res_gids.append(row['res_gid'])
gen_gids.append(gen_gid)
if profile is None:
profile = to_build * cf_profile
else:
profile += to_build * cf_profile
if buildout <= 0:
break
if buildout > 1e-6:
msg = ('PlexosNode wasnt able to build out fully for supply '
'curve gid {}. {:.4e} MW of capacity remain to be built '
'out of {:.4f} MW requested.'
.format(sc_gid, buildout, full_buildout))
logger.error(msg)
raise RuntimeError(msg)
if profile is None:
msg = ('PlexosNode object could not build profile for '
'sc buildout: {}'.format(self._sc_build))
logger.error(msg)
raise RuntimeError(msg)
if len(profile.shape) != 1:
profile = profile.flatten()
return profile, sc_gids, res_gids, gen_gids, res_built
[docs] def make_node_profile(self):
"""Make an aggregated generation profile for a single plexos node.
Returns
-------
profile : np.ndarray
(t, ) array of generation in MW.
sc_gids : list
List of supply curve point GID's that were build for this node
res_gids : list
List of resource GID's that were built for this plexos node.
gen_gids : list
List of generation GID's that were built for this plexos node.
res_built : list
List of built capacities at each resource GID for this plexos node.
"""
profile = None
sc_gids = []
res_gids = []
gen_gids = []
res_built = []
for i in self._sc_build.index.values:
profile, i_sc_gids, i_res_gids, i_gen_gids, i_res_built = \
self._build_sc_point_profile(i, profile)
sc_gids += i_sc_gids
res_gids += i_res_gids
gen_gids += i_gen_gids
res_built += i_res_built
return profile, sc_gids, res_gids, gen_gids, res_built
[docs] @classmethod
def run(cls, sc_build, cf_fpath, res_gids=None, force_full_build=False,
forecast_fpath=None, forecast_map=None, dset_tag=None):
"""Make an aggregated generation profile for a single plexos node.
Parameters
----------
sc_build : pd.DataFrame
Supply curve buildout table. Must only have rows that are built
in this plexos node. Must have res_gids, gid_counts, gid_capacity,
and built_capacity at each SC point. Note that the gen_gids column
in the rev_sc is ignored and only the res_gids from rev_sc are
mapped to the corresponding "gid" column in the cf_fpath meta data.
cf_fpath : str
File path to capacity factor file (reV gen output) to
get profiles from.
res_gids : list | np.ndarray, optional
Resource GID's available in cf_fpath, if None pull from cf_fpath,
by default None
force_full_build : bool
Flag to ensure the full requested buildout is built at each SC
point. If True, the remainder of the requested build will always
be built at the last resource gid in the sc point.
forecast_fpath : str | None, optional
Forecasted capacity factor .h5 file path (reV results).
If not None, the generation profiles are sourced from this file,
by default None
forecast_map : np.ndarray | None, optional
(n, 1) array of forecast meta data indices mapped to the generation
meta indices where n is the number of generation points. None if
no forecast data being considered,
by default None
dset_tag : str
Dataset tag to append to dataset names in cf profile file. e.g. If
the cf profile file is a multi year file using dset_tag="-2008"
will enable us to select the corresponding datasets
(cf_mean-2008, cf_profile-2008, etc)
Returns
-------
profile : np.ndarray
(t, ) array of generation in MW.
sc_gids : list
List of supply curve point GID's that were build for this node
res_gids : list
List of resource GID's that were built for this plexos node.
gen_gids : list
List of generation GID's that were built for this plexos node.
res_built : list
List of built capacities at each resource GID for this plexos node.
"""
n = cls(sc_build, cf_fpath, res_gids=res_gids,
force_full_build=force_full_build,
forecast_fpath=forecast_fpath,
forecast_map=forecast_map,
dset_tag=dset_tag)
profile, sc_gids, res_gids, gen_gids, res_built = n.make_node_profile()
return profile, sc_gids, res_gids, gen_gids, res_built
[docs]class BaseProfileAggregation(ABC):
"""
Base framework to aggregate rev generation profiles to plants or plexos
nodes.
"""
def __init__(self):
"""Placeholder for concrete initialization"""
self._cf_fpath = None
self._res_gids = None
self._node_map = None
self._forecast_fpath = None
self._forecast_map = None
self._output_meta = None
self._time_index = None
self._timezone = None
self._plant_name_col = None
self._tech_tag = None
self._dset_tag = ""
@property
def time_index(self):
"""Get the generation profile time index.
Returns
-------
time_index : pd.Datetimeindex
Pandas datetime index sourced from the capacity factor data.
"""
if self._time_index is None:
with Outputs(self._cf_fpath, mode='r') as cf_outs:
self._time_index = cf_outs['time_index' + self._dset_tag]
return self._time_index
@property
def tz_alias(self):
"""Get a short 3-char tz alias if the timezone is common in the US
(pst, mst, cst, est)
Returns
-------
str
"""
return TZ_ALIASES.get(self._timezone, self._timezone)
@property
def available_res_gids(self):
"""Resource gids available in the cf file.
Returns
-------
res_gids : np.ndarray
Array of resource GIDs available in the cf file.
"""
if self._res_gids is None:
with Outputs(self._cf_fpath, mode='r') as cf_outs:
self._res_gids = cf_outs.get_meta_arr('gid')
if not isinstance(self._res_gids, np.ndarray):
self._res_gids = np.array(list(self._res_gids))
return self._res_gids
@property
def node_map(self):
"""Nearest neighbor output mapping rev supply curve points to plants
or plexos nodes.
Returns
-------
np.ndarray
"""
return self._node_map
@property
def forecast_map(self):
"""An array mapping the reV "actuals" generation data to forecast data
of a different resolution (if input). This is an (n, 1) array where n
is the number of "actuals" generation data points. So
self.forecast_map[9] yields the forecast index that corresponds to
index 9 in the cf_fpath reV generation output.
Returns
-------
np.ndarray | None
"""
return self._forecast_map
@staticmethod
def _make_forecast_map(cf_fpath, forecast_fpath):
"""Run ckdtree to map forecast pixels to generation pixels.
Parameters
----------
cf_fpath : str
File path to capacity factor file (reV gen output) to
get profiles from.
forecast_fpath : str | None
Forecasted capacity factor .h5 file path (reV results).
If not None, the generation profiles are sourced from this file.
Returns
-------
fmap : np.ndarray | None
(n, 1) array of forecast meta data indices mapped to the generation
meta indices where n is the number of generation points. None if
no forecast filepath input.
"""
fmap = None
if forecast_fpath is not None:
logger.info('Making KDTree from forecast data: {}'
.format(forecast_fpath))
with Outputs(cf_fpath) as out:
meta_cf = out.meta
with Outputs(forecast_fpath) as out:
meta_fo = out.meta
clabels = get_coord_labels(meta_cf)
# pylint: disable=not-callable
tree = cKDTree(meta_fo[clabels].values)
d, fmap = tree.query(meta_cf[clabels].values)
logger.info('Distance (min / mean / max) from generation pixels '
'to forecast pixels is: {} / {} / {}'
.format(d.min(), d.mean(), d.max()))
return fmap
def _init_output(self, n_profiles):
"""Init the output array of aggregated profiles.
Parameters
----------
n_profiles : int
Number of profiles that are going to be generated
Returns
-------
output : np.ndarray
(t, n) array of zeros where t is the timeseries length and n is
n_profiles.
"""
if self._forecast_fpath is None:
with Outputs(self._cf_fpath, mode='r') as out:
t = len(out['time_index' + self._dset_tag])
else:
with Outputs(self._forecast_fpath, mode='r') as out:
t = len(out['time_index' + self._dset_tag])
shape = (t, n_profiles)
output = np.zeros(shape, dtype=np.float32)
return output
def _ammend_output_meta(self, row_idx, sc_gids, res_gids, gen_gids,
res_built):
"""Ammend the output meta dataframe with new info about
resource buildouts.
Parameters
----------
row_idx : int
Index location to modify (iloc).
sc_gids : list
List of supply curve point GID's that were build for this node
res_gids : list
List of resource GID's that were built for this plexos node.
gen_gids : list
List of generation GID's that were built for this plexos node.
res_built : list
List of built capacities at each resource GID for this plexos node.
"""
index = self._output_meta.index.values[row_idx]
if self._output_meta.at[index, 'res_gids'] is None:
self._output_meta.at[index, 'sc_gids'] = sc_gids
self._output_meta.at[index, 'res_gids'] = res_gids
self._output_meta.at[index, 'gen_gids'] = gen_gids
self._output_meta.at[index, 'res_built'] = res_built
else:
self._output_meta.at[index, 'sc_gids'] += sc_gids
self._output_meta.at[index, 'res_gids'] += res_gids
self._output_meta.at[index, 'gen_gids'] += gen_gids
self._output_meta.at[index, 'res_built'] += res_built
[docs] @staticmethod
def tz_convert_profiles(profiles, timezone):
"""Convert profiles to local time and forward/back fill missing data.
Parameters
----------
profiles : np.ndarray
Profiles of shape (time, n_plants) in UTC
timezone : str
Timezone for output generation profiles. This is a string that will
be passed to pytz.timezone() e.g. US/Pacific, US/Mountain,
US/Central, US/Eastern, or UTC. For a list of all available
timezones, see pytz.all_timezones
Returns
-------
profiles : np.ndarray
Profiles of shape (time, n_plants) in timezone
"""
logger.info('Converting profiles timezone to {}'.format(timezone))
if len(profiles) < 8760:
msg = ('Cannot use profiles that are not at least hourly! '
'Received shape {}'.format(profiles.shape))
logger.error(msg)
raise RuntimeError(msg)
steps_per_hour = len(profiles) // 8760
# use jan 1 to avoid daylight savings
date = datetime.datetime(2011, 1, 1)
date = pytz.timezone(timezone).localize(date)
tz_offset = int(date.strftime('%z')[:3])
roll_int = steps_per_hour * tz_offset
profiles = np.roll(profiles, roll_int, axis=0)
if roll_int < 0:
for i in range(roll_int, 0):
# don't fill nighttime for solar
if not (profiles[i, :] == 0).all():
profiles[i, :] = np.nan
profiles = pd.DataFrame(profiles).ffill().values
elif roll_int > 0:
for i in range(1, roll_int + 1):
# don't fill nighttime for solar
if not (profiles[i, :] == 0).all():
profiles[i, :] = np.nan
profiles = pd.DataFrame(profiles).bfill().values
return profiles
[docs] @staticmethod
def get_unique_plant_names(table, name_col, tech_tag=None):
"""Get a list of ordered unique plant names
Parameters
----------
table : pd.DataFrame
Plexos / plant meta data table where every row is a plant
name_col : str
Column label in table. Exception will be raised if not found.
tech_tag : str
Technology tag to append to plant names like "pv" or "wind"
Returns
-------
names : list | None
List of unique plant names
"""
names = None
if name_col is None:
return names
if name_col not in table:
msg = ('Could not find requested name column "{}" in plexos '
'table, the available columns are: {}'
.format(name_col, sorted(table.columns.values)))
logger.error(msg)
raise KeyError(msg)
names = table[name_col].values.tolist()
if tech_tag is not None:
names = [name + f' {tech_tag}' for name in names]
counter = Counter(names)
if any(c > 1 for c in counter.values()):
for name, count in counter.items():
if count > 1:
dup_names = [name + f' {c}' for c in range(count)]
for dup_name in dup_names:
names[names.index(name)] = dup_name
return names
[docs] def export(self, meta, time_index, profiles, out_fpath):
"""Export generation profiles to h5 and plexos-formatted csv
Parameters
----------
plant_meta : pd.DataFrame
Plant / plexos node meta data with built capacities and mappings to
the resource used.
time_index : pd.datetimeindex
Time index for the profiles.
profiles : np.ndarray
Generation profile timeseries in MW at each plant / plexos node.
out_fpath : str, optional
Path to .h5 file into which plant buildout should be saved. A
plexos-formatted csv will also be written in the same directory.
By default None.
"""
if not out_fpath.endswith('.h5'):
out_fpath = out_fpath + '.h5'
out_fpath = out_fpath.replace('.h5', f'_{self.tz_alias}.h5')
logger.info('Saving result to file: {}'.format(out_fpath))
profiles = self.tz_convert_profiles(profiles, self._timezone)
with Outputs(out_fpath, mode='a') as out:
out.meta = meta
out.time_index = time_index
out._create_dset('profiles',
profiles.shape,
profiles.dtype,
chunks=(None, 100),
data=profiles,
attrs={'units': 'MW'})
names = np.arange(profiles.shape[1])
if self._plant_name_col is not None:
names = self.get_unique_plant_names(meta, self._plant_name_col,
self._tech_tag)
df_plx = pd.DataFrame(profiles, columns=names,
index=time_index.tz_convert(None))
df_plx.index.name = 'DATETIME'
csv_fp = out_fpath.replace('.h5', '.csv')
df_plx.to_csv(csv_fp)
logger.info('Wrote plexos formatted profiles to: {}'.format(csv_fp))