# -*- coding: utf-8 -*-
"""
Created on Wed Aug 21 13:47:43 2019
@author: gbuster
"""
from concurrent.futures import as_completed
import json
import logging
import numpy as np
import pandas as pd
from sklearn.neighbors import BallTree
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.utilities import parse_table
from reVX.plexos.base import BaseProfileAggregation, PlexosNode
from reVX.plexos.utilities import get_coord_labels
logger = logging.getLogger(__name__)
[docs]class SimplePlantBuilder(BaseProfileAggregation):
"""Class to build generation profiles for "plants" by aggregating resource
from nearest neighbor supply curve points.
"""
def __init__(self, plant_meta, rev_sc, cf_fpath, forecast_fpath=None,
plant_name_col=None, tech_tag=None, timezone='UTC',
share_resource=True, max_workers=None):
"""Run plexos aggregation.
Parameters
----------
plant_meta : str | pd.DataFrame
Str filepath or extracted dataframe for plant meta data with every
row representing a plant with columns for latitude, longitude,
and capacity (in MW). Plants will compete for available capacity
in the reV supply curve input and will be prioritized based on the
row order of this input.
rev_sc : str | pd.DataFrame
reV supply curve or sc-aggregation output table including sc_gid,
latitude, longitude, res_gids, gid_counts, mean_cf.
cf_fpath : str
File path to capacity factor file (reV gen output) to
get profiles from.
forecast_fpath : str | None
Forecasted capacity factor .h5 file path (reV results).
If not None, the generation profiles are sourced from this file.
plant_name_col : str | None
Column in plexos_table that has the plant name that should be used
in the plexos output csv column headers.
tech_tag : str | None
Optional technology tag to include as a suffix in the plexos output
csv column headers.
timezone : str
Timezone for output generation profiles. This is a string that will
be passed to pytz.timezone() e.g. US/Pacific, US/Mountain,
US/Central, US/Eastern, or UTC. For a list of all available
timezones, see pytz.all_timezones
share_resource : bool
Flag to share available capacity within a single resource GID
between multiple plants.
max_workers : int | None
Max workers for parallel profile aggregation. None uses all
available workers. 1 will run in serial.
"""
logger.info('Initializing SimplePlantBuilder.')
super().__init__()
self._res_gids = None
self._plant_meta = parse_table(plant_meta).reset_index(drop=True)
self._sc_table = parse_table(rev_sc).reset_index(drop=True)
self._cf_fpath = cf_fpath
self._forecast_fpath = forecast_fpath
self._share_res = share_resource
self._output_meta = None
self._plant_name_col = plant_name_col
self._tech_tag = tech_tag
self._timezone = timezone
self.max_workers = max_workers
required = ('sc_gid', 'latitude', 'longitude', 'res_gids',
'gid_counts', 'mean_cf')
missing = [r not in self._sc_table for r in required]
if any(missing):
msg = ('SimplePlantBuilder needs the following missing columns '
'in the rev_sc input: {}'.format(missing))
logger.error(msg)
raise ValueError(msg)
required = ('latitude', 'longitude', 'capacity')
missing = [r not in self._plant_meta for r in required]
if any(missing):
msg = ('SimplePlantBuilder needs the following missing columns '
'in the plant_meta input: {}'.format(missing))
logger.error(msg)
raise ValueError(msg)
self._node_map = self._make_node_map()
self._forecast_map = self._make_forecast_map(self._cf_fpath,
self._forecast_fpath)
self._compute_gid_capacities()
logger.info('Finished initializing SimplePlantBuilder.')
def _compute_gid_capacities(self):
"""Compute the individual resource gid capacities and make a new
column in the SC table."""
for label in ('res_gids', 'gid_counts'):
if isinstance(self._sc_table[label].values[0], str):
self._sc_table[label] = self._sc_table[label].apply(json.loads)
self._sc_table['gid_capacity'] = None
for i, row in self._sc_table.iterrows():
gid_counts = row['gid_counts']
gid_capacity = gid_counts / np.sum(gid_counts) * row['capacity']
self._sc_table.at[i, 'gid_capacity'] = list(gid_capacity)
def _make_node_map(self):
"""Run haversine balltree to map plant locations to nearest supply
supply curve points
Returns
-------
ind : np.ndarray
BallTree (haversine) query output, (n, m) array of plant indices
mapped to the SC points where n is the number of plants, m is the
number of SC points, and each row in the array yields the sc points
m closest to the plant n.
"""
logger.debug('Making node map...')
plant_coord_labels = get_coord_labels(self._plant_meta)
sc_coord_labels = get_coord_labels(self._sc_table)
# pylint: disable=not-callable
sc_coords = np.radians(self._sc_table[sc_coord_labels].values)
plant_coords = np.radians(self._plant_meta[plant_coord_labels].values)
tree = BallTree(sc_coords, metric='haversine')
ind = tree.query(plant_coords, return_distance=False,
k=len(self._sc_table))
logger.debug('Finished mkaing node map.')
return ind
@property
def plant_meta(self):
"""Get plant meta data for the requested plant buildout
with buildout information
Returns
-------
pd.DataFrame
"""
if self._output_meta is None:
self._output_meta = self._plant_meta.copy()
self._output_meta['sc_gids'] = None
self._output_meta['res_gids'] = None
self._output_meta['gen_gids'] = None
self._output_meta['res_built'] = None
return self._output_meta
[docs] def assign_plant_buildouts(self):
"""March through the plant meta data and make subsets of the supply
curve table that will be built out for each plant. The supply curve
table attribute of this SimplePlantBuilder instance will be manipulated
such that total sc point capacity and resource gid capacity is reduced
whenever a plant is built. In this fashion, resource in SC points will
not be double counted, but resource within an SC point can be divided
up between multiple plants. Resource within an SC point is prioritized
by available capacity.
Returns
-------
plant_sc_builds : dict
Dictionary mapping the plant row indices (keys) to subsets of the
SC table showing what should be built for each plant. The subset
SC tables in this dict will no longer match the sc table attribute
of the SimplePlantBuilder instance, because the tables in this dict
show what should be built, and the sc table attribute will show
what is remaining.
"""
plant_sc_builds = {}
built_res_gids = []
# March through plant meta data table in order provided
for i, plant_row in self._plant_meta.iterrows():
logger.debug('Starting plant buildout assignment for plant {} '
'out of {}'.format(i + 1, len(self._plant_meta)))
plant_cap_to_build = float(plant_row['capacity'])
single_plant_sc = pd.DataFrame()
# March through the SC table in order of the node map
for sc_loc in self.node_map[i]:
sc_point = self._sc_table.loc[sc_loc].copy()
sc_capacity = sc_point['capacity']
# Buildout capacity in this sc point
if sc_capacity >= 0:
# Make arrays of gid capacities that will be built
# for this plant and also saved for subsequent plants.
sc_point_res_gids = np.array(sc_point['res_gids'])
cap_orig = np.array(sc_point['gid_capacity'])
cap_remain = cap_orig.copy()
cap_build = np.zeros_like(cap_orig)
# Build greatest available capacity first
order = np.flip(np.argsort(cap_orig))
for j in order:
res_gid = sc_point_res_gids[j]
if self._share_res or res_gid not in built_res_gids:
# add built capacity to the "to build" array
# (on a resource point per sc point basis)
# and remove from the "remaining" array
built = np.minimum(plant_cap_to_build, cap_orig[j])
cap_build[j] += built
cap_remain[j] -= built
plant_cap_to_build -= built
built_res_gids.append(res_gid)
# buildout for this plant is fully complete
if plant_cap_to_build <= 0:
break
if cap_build.sum() > 0:
# Capacity was built in this SC point
cap_build = cap_build.tolist()
cap_orig = cap_orig.tolist()
sc_point['capacity'] = sum(cap_build)
sc_point['built_capacity'] = sum(cap_build)
sc_point['gid_capacity'] = cap_build
single_plant_sc = pd.concat([single_plant_sc,
sc_point.to_frame().T],
axis=0)
self._sc_table.at[sc_loc, 'capacity'] -= sum(cap_build)
self._sc_table.at[sc_loc, 'gid_capacity'] = cap_remain
# buildout for this plant is fully complete
if plant_cap_to_build <= 0:
plant_sc_builds[i] = single_plant_sc
break
logger.info('Finished plant buildout assignment.')
return plant_sc_builds
[docs] def check_valid_buildouts(self, plant_sc_builds):
"""Check that plant buildouts are mapped to valid resource data that
can be found in the cf_fpath input."""
global_built_res_gids = []
for i, single_plant_sc in plant_sc_builds.items():
sc_res_gids = single_plant_sc['res_gids'].values.tolist()
sc_res_gids = [g for subset in sc_res_gids for g in subset]
gid_caps = single_plant_sc['gid_capacity'].values.tolist()
gid_caps = [g for subset in gid_caps for g in subset]
assert len(gid_caps) == len(sc_res_gids)
plant_built_res_gids = [gid for j, gid in enumerate(sc_res_gids)
if gid_caps[j] > 0]
missing = [gid for gid in plant_built_res_gids
if gid not in self.available_res_gids]
if any(missing):
msg = ('Plant index {} was mapped to resource gids that are '
'missing from the cf file: {}'.format(i, missing))
logger.error(msg)
raise RuntimeError(msg)
shared = [gid for gid in plant_built_res_gids
if gid in global_built_res_gids]
if any(shared) and not self._share_res:
msg = ('SimplePlantBuilder shared resource gids when it '
'should not have: {}'.format(shared))
logger.error(msg)
raise RuntimeError(msg)
else:
global_built_res_gids += plant_built_res_gids
[docs] def make_profiles(self, plant_sc_builds):
"""Make a 2D array of aggregated plant gen profiles.
Returns
-------
profiles : np.ndarray
(t, n) array of plant eneration profiles where t is the
timeseries length and n is the number of plants.
"""
if self.max_workers != 1:
profiles = self._make_profiles_parallel(plant_sc_builds)
else:
profiles = self._make_profiles_serial(plant_sc_builds)
return profiles
def _make_profiles_parallel(self, plant_sc_builds):
"""Make a 2D array of aggregated plant gen profiles in parallel.
Returns
-------
profiles : np.ndarray
(t, n) array of plant node generation profiles where t is the
timeseries length and n is the number of plants.
"""
logger.info('Starting plant profile buildout in parallel.')
profiles = self._init_output(len(self.plant_meta))
progress = 0
futures = {}
loggers = [__name__, 'reVX']
with SpawnProcessPool(max_workers=self.max_workers,
loggers=loggers) as exe:
for i, plant_sc_subset in plant_sc_builds.items():
f = exe.submit(PlexosNode.run,
plant_sc_subset, self._cf_fpath,
res_gids=self.available_res_gids,
forecast_fpath=self._forecast_fpath,
forecast_map=self._forecast_map)
futures[f] = i
for n, f in enumerate(as_completed(futures)):
i = futures[f]
profile, sc_gids, res_gids, gen_gids, res_built = f.result()
profiles[:, i] = profile
self._ammend_output_meta(i, sc_gids, res_gids, gen_gids,
res_built)
current_prog = (n + 1) // (len(futures) / 100)
if current_prog > progress:
progress = current_prog
logger.info('{} % of plant node profiles built.'
.format(progress))
logger.info('Finished plant profile buildout.')
return profiles
def _make_profiles_serial(self, plant_sc_builds):
"""Make a 2D array of aggregated plexos gen profiles in serial.
Returns
-------
profiles : np.ndarray
(t, n) array of Plexos node generation profiles where t is the
timeseries length and n is the number of plexos nodes.
"""
logger.info('Starting plant profile buildout in serial.')
profiles = self._init_output(len(self.plant_meta))
progress = 0
for i, plant_sc_subset in plant_sc_builds.items():
p = PlexosNode.run(
plant_sc_subset, self._cf_fpath,
res_gids=self.available_res_gids,
forecast_fpath=self._forecast_fpath,
forecast_map=self._forecast_map)
profile, sc_gids, res_gids, gen_gids, res_built = p
profiles[:, i] = profile
self._ammend_output_meta(i, sc_gids, res_gids, gen_gids, res_built)
current_prog = ((i + 1)
// (len(np.unique(self.node_map)) / 100))
if current_prog > progress:
progress = current_prog
logger.info('{} % of plant profiles built.'
.format(progress))
logger.info('Finished plant profile buildout.')
return profiles
[docs] @classmethod
def run(cls, plant_meta, rev_sc, cf_fpath, forecast_fpath=None,
plant_name_col=None, tech_tag=None, timezone='UTC',
share_resource=True, max_workers=None, out_fpath=None):
"""Build profiles and meta data.
Parameters
----------
plant_meta : str | pd.DataFrame
Str filepath or extracted dataframe for plant meta data with every
row representing a plant with columns for latitude, longitude,
and capacity (in MW). Plants will compete for available capacity
in the reV supply curve input and will be prioritized based on the
row order of this input.
rev_sc : str | pd.DataFrame
reV supply curve or sc-aggregation output table including sc_gid,
latitude, longitude, res_gids, gid_counts, mean_cf.
cf_fpath : str
File path to capacity factor file (reV gen output) to
get profiles from.
forecast_fpath : str | None
Forecasted capacity factor .h5 file path (reV results).
If not None, the generation profiles are sourced from this file.
plant_name_col : str | None
Column in plexos_table that has the plant name that should be used
in the plexos output csv column headers.
tech_tag : str | None
Optional technology tag to include as a suffix in the plexos output
csv column headers.
timezone : str
Timezone for output generation profiles. This is a string that will
be passed to pytz.timezone() e.g. US/Pacific, US/Mountain,
US/Central, US/Eastern, or UTC. For a list of all available
timezones, see pytz.all_timezones
share_resource : bool
Flag to share available capacity within a single resource GID
between multiple plants.
max_workers : int | None
Max workers for parallel profile aggregation. None uses all
available workers. 1 will run in serial.
out_fpath : str, optional
Path to .h5 file into which plant buildout should be saved. A
plexos-formatted csv will also be written in the same directory.
By default None.
Returns
-------
plant_meta : pd.DataFrame
Plant meta data with built capacities and mappings to the
resource used.
time_index : pd.datetimeindex
Time index for the profiles.
profiles : np.ndarray
Generation profile timeseries in MW at each plant.
"""
pb = cls(plant_meta, rev_sc, cf_fpath, forecast_fpath=forecast_fpath,
plant_name_col=plant_name_col, tech_tag=tech_tag,
timezone=timezone, share_resource=share_resource,
max_workers=max_workers)
plant_sc_builds = pb.assign_plant_buildouts()
pb.check_valid_buildouts(plant_sc_builds)
profiles = pb.make_profiles(plant_sc_builds)
if out_fpath is not None:
pb.export(pb.plant_meta, pb.time_index, profiles, out_fpath)
return pb.plant_meta, pb.time_index, profiles