Source code for reV.handlers.transmission

# -*- coding: utf-8 -*-
"""
Module to handle Supply Curve Transmission features
"""
import json
import logging
import numpy as np
import os
import pandas as pd
from warnings import warn

from reV.utilities import SupplyCurveField
from reV.utilities.exceptions import (HandlerWarning, HandlerKeyError,
                                      HandlerRuntimeError)

from rex.utilities.utilities import parse_table
from gaps.config import load_config

logger = logging.getLogger(__name__)


[docs]class TransmissionFeatures: """ Class to handle Supply Curve Transmission features """ def __init__(self, trans_table, line_tie_in_cost=14000, line_cost=2279, station_tie_in_cost=0, center_tie_in_cost=0, sink_tie_in_cost=1e9, avail_cap_frac=1, line_limited=False): """ Parameters ---------- trans_table : str | pandas.DataFrame Path to .csv or config file or DataFrame containing supply curve transmission mapping line_tie_in_cost : float, optional Cost of connecting to a transmission line in $/MW, by default 14000 line_cost : float, optional Cost of building transmission line during connection in $/MW-km, by default 2279 station_tie_in_cost : float, optional Cost of connecting to a substation in $/MW, by default 0 center_tie_in_cost : float, optional Cost of connecting to a load center in $/MW, by default 0 sink_tie_in_cost : float, optional Cost of connecting to a synthetic load center (infinite sink) in $/MW, by default 1e9 avail_cap_frac : float, optional Fraction of capacity that is available for connection, by default 1 line_limited : bool, optional Substation connection is limited by maximum capacity of the attached lines, legacy method, by default False """ logger.debug('Trans table input: {}'.format(trans_table)) logger.debug('Line tie in cost: {} $/MW'.format(line_tie_in_cost)) logger.debug('Line cost: {} $/MW-km'.format(line_cost)) logger.debug('Station tie in cost: {} $/MW' .format(station_tie_in_cost)) logger.debug('Center tie in cost: {} $/MW'.format(center_tie_in_cost)) logger.debug('Synthetic load center tie in cost: {} $/MW' .format(sink_tie_in_cost)) logger.debug('Available capacity fraction: {}' .format(avail_cap_frac)) logger.debug('Line limited substation connections: {}' .format(line_limited)) self._line_tie_in_cost = line_tie_in_cost self._line_cost = line_cost self._station_tie_in_cost = station_tie_in_cost self._center_tie_in_cost = center_tie_in_cost self._sink_tie_in_cost = sink_tie_in_cost self._avail_cap_frac = avail_cap_frac self._features = self._get_features(trans_table) self._feature_gid_list = list(self._features.keys()) self._available_mask = np.ones( (int(1 + max(list(self._features.keys()))), ), dtype=bool) self._line_limited = line_limited def __repr__(self): msg = "{} with {} features".format(self.__class__.__name__, len(self)) return msg def __len__(self): return len(self._features) def __getitem__(self, gid): if gid not in self._features: msg = "Invalid feature gid {}".format(gid) logger.error(msg) raise HandlerKeyError(msg) return self._features[gid] @staticmethod def _parse_dictionary(features): """ Parse features dict object or config file Parameters ---------- features : dict | str Dictionary of transmission features or path to config containing dictionary of transmission features Returns ------- features : dict Nested dictionary of features (lines, substations, loadcenters) lines : {capacity} substations : {lines} loadcenters : {capacity} """ if isinstance(features, str): if os.path.isfile(features): features = load_config(features) else: features = json.loads(features) elif not isinstance(features, dict): msg = ("Transmission features must be a config file, object, " "or a dictionary") logger.error(msg) raise ValueError(msg) return features @staticmethod def _parse_table(trans_table): """ Extract features and their capacity from supply curve transmission mapping table Parameters ---------- trans_table : str | pandas.DataFrame Path to .csv or .json containing supply curve transmission mapping Returns ------- trans_table : pandas.DataFrame DataFrame of transmission features """ try: trans_table = parse_table(trans_table) except ValueError as ex: logger.error(ex) raise trans_table = \ trans_table.rename( columns={'trans_line_gid': SupplyCurveField.TRANS_GID, 'trans_gids': 'trans_line_gids'}) contains_dist_in_miles = "dist_mi" in trans_table missing_km_dist = SupplyCurveField.DIST_SPUR_KM not in trans_table if contains_dist_in_miles and missing_km_dist: trans_table = trans_table.rename( columns={"dist_mi": SupplyCurveField.DIST_SPUR_KM} ) trans_table[SupplyCurveField.DIST_SPUR_KM] *= 1.60934 return trans_table def _features_from_table(self, trans_table): """ Extract features and their capacity from supply curve transmission mapping table Parameters ---------- trans_table : pandas.DataFrame DataFrame of transmission features Returns ------- features : dict Nested dictionary of features (lines, substations, loadcenters) lines : {capacity} substations : {lines} loadcenters : {capacity} """ features = {} cap_frac = self._avail_cap_frac trans_features = trans_table.groupby(SupplyCurveField.TRANS_GID) trans_features = trans_features.first() for gid, feature in trans_features.iterrows(): name = feature[SupplyCurveField.TRANS_TYPE].lower() feature_dict = {'type': name} if name == 'transline': feature_dict[SupplyCurveField.TRANS_CAPACITY] = ( feature['ac_cap'] * cap_frac ) elif name == 'substation': feature_dict['lines'] = json.loads(feature['trans_line_gids']) elif name == 'loadcen': feature_dict[SupplyCurveField.TRANS_CAPACITY] = ( feature['ac_cap'] * cap_frac ) elif name == 'pcaloadcen': feature_dict[SupplyCurveField.TRANS_CAPACITY] = None else: msg = ('Cannot not recognize feature type "{}" ' 'for trans gid {}!'.format(name, gid)) logger.error(msg) raise HandlerKeyError(msg) features[gid] = feature_dict return features def _get_features(self, trans_table): """ Create transmission features dictionary either from supply curve transmission mapping or from pre-created dictionary Parameters ---------- trans_table : str Path to .csv or .json containing supply curve transmission mapping Returns ------- features : dict Nested dictionary of features (lines, substations, loadcenters) lines : {capacity} substations : {lines} loadcenters : {capacity} """ trans_table = self._parse_table(trans_table) features = self._features_from_table(trans_table) return features
[docs] def check_feature_dependencies(self): """Check features for dependencies that are missing and raise error.""" missing = {} for gid, feature_dict in self._features.items(): for line_gid in feature_dict.get('lines', []): if line_gid not in self._features: if gid not in missing: missing[gid] = [] missing[gid].append(line_gid) if any(missing): emsg = ('Transmission feature table has {} parent features that ' 'depend on missing lines. Missing dependencies: {}' .format(len(missing), missing)) logger.error(emsg) raise RuntimeError(emsg)
@staticmethod def _calc_cost(distance, line_cost=2279, tie_in_cost=0, transmission_multiplier=1): """ Compute transmission cost in $/MW Parameters ---------- distance : float Distance to feature in kms line_tie_in_cost : float, optional Cost of connecting to a transmission line in $/MW, by default 14000 tie_in_cost : float, optional Cost to tie in line to feature in $/MW, by default 0 tranmission_multiplier : float, optional Multiplier for region specific line cost increases, by default 1 Returns ------- cost : float Cost of transmission in $/MW """ cost = (distance * line_cost * transmission_multiplier + tie_in_cost) return cost def _substation_capacity(self, gid, line_gids): """ Get capacity of a substation from its tranmission lines Parameters ---------- gid : int Substation gid line_gids : list List of transmission line gids connected to the substation Returns ------- avail_cap : float Substation available capacity """ try: line_caps = [self[l_gid][SupplyCurveField.TRANS_CAPACITY] for l_gid in line_gids] except HandlerKeyError as e: msg = ('Could not find capacities for substation gid {} and ' 'connected lines: {}'.format(gid, line_gids)) logger.error(msg) raise HandlerKeyError(msg) from e avail_cap = sum(line_caps) / 2 if self._line_limited: max_cap = max(line_caps) / 2 if max_cap < avail_cap: avail_cap = max_cap return avail_cap
[docs] def available_capacity(self, gid): """ Get available capacity for given line Parameters ---------- gid : int Unique id of feature of interest Returns ------- avail_cap : float Available capacity = capacity * available fraction default = 100% """ feature = self[gid] if SupplyCurveField.TRANS_CAPACITY in feature: avail_cap = feature[SupplyCurveField.TRANS_CAPACITY] elif 'lines' in feature: avail_cap = self._substation_capacity(gid, feature['lines']) else: msg = ('Could not parse available capacity from feature: {}' .format(feature)) logger.error(msg) raise HandlerRuntimeError(msg) return avail_cap
def _update_availability(self, gid): """ Check features available capacity, if its 0 update _available_mask Parameters ---------- gid : list Feature gid to check """ avail_cap = self.available_capacity(gid) if avail_cap == 0: self._available_mask[gid] = False
[docs] def check_availability(self, gid): """ Check availablity of feature with given gid Parameters ---------- gid : int Feature gid to check Returns ------- bool Whether the gid is available or not """ return self._available_mask[gid]
def _connect(self, gid, capacity): """ Connect to a standalone transmission feature (not a substation) and decrement the feature's available capacity. Raise exception if not able to connect. Parameters ---------- gid : int Feature gid to connect to capacity : float Capacity needed in MW """ avail_cap = self[gid][SupplyCurveField.TRANS_CAPACITY] if avail_cap < capacity: msg = ("Cannot connect to {}: " "needed capacity({} MW) > " "available capacity({} MW)" .format(gid, capacity, avail_cap)) logger.error(msg) raise RuntimeError(msg) self[gid][SupplyCurveField.TRANS_CAPACITY] -= capacity def _fill_lines(self, line_gids, line_caps, capacity): """ Fill any lines that cannot handle equal portion of capacity and remove from lines to be filled and capacity needed Parameters ---------- line_gids : ndarray Vector of transmission line gids connected to the substation line_caps : ndarray Vector of available capacity of the transmission lines capacity : float Capacity needed in MW Returns ---------- line_gids : ndarray Transmission lines with available capacity line_caps : ndarray Capacity of lines with available capacity capacity : float Updated capacity needed to be applied to substation in MW """ apply_cap = capacity / len(line_gids) mask = line_caps < apply_cap for pos in np.where(line_caps < apply_cap)[0]: gid = line_gids[pos] apply_cap = line_caps[pos] self._connect(gid, apply_cap) capacity -= apply_cap return line_gids[~mask], line_caps[~mask], capacity def _spread_substation_load(self, line_gids, line_caps, capacity): """ Spread needed capacity over all lines connected to substation Parameters ---------- line_gids : ndarray Vector of transmission line gids connected to the substation line_caps : ndarray Vector of available capacity of the transmission lines capacity : float Capacity needed to be applied to substation in MW """ while True: lines, line_caps, capacity = self._fill_lines(line_gids, line_caps, capacity) if len(lines) < len(line_gids): line_gids = lines else: break apply_cap = capacity / len(lines) for gid in lines: self._connect(gid, apply_cap) def _connect_to_substation(self, line_gids, capacity): """ Connect to substation and update internal dictionary accordingly Parameters ---------- line_gids : list List of transmission line gids connected to the substation capacity : float Capacity needed in MW line_lmited : bool Substation connection is limited by maximum capacity of the attached lines """ line_caps = np.array([self[gid][SupplyCurveField.TRANS_CAPACITY] for gid in line_gids]) if self._line_limited: gid = line_gids[np.argmax(line_caps)] self._connect(gid, capacity) else: non_zero = np.nonzero(line_caps)[0] line_gids = np.array([line_gids[i] for i in non_zero]) line_caps = line_caps[non_zero] self._spread_substation_load(line_gids, line_caps, capacity)
[docs] def connect(self, gid, capacity, apply=True): """ Check if you can connect to given feature If apply, update internal dictionary accordingly Parameters ---------- gid : int Unique id of feature of intereset capacity : float Capacity needed in MW apply : bool Apply capacity to feature with given gid and update internal dictionary Returns ------- connected : bool Flag as to whether connection is possible or not """ if self.check_availability(gid): avail_cap = self.available_capacity(gid) if avail_cap is not None and capacity > avail_cap: connected = False else: connected = True if apply: feature_type = self[gid]['type'] if feature_type == 'transline': self._connect(gid, capacity) elif feature_type == 'substation': lines = self[gid]['lines'] self._connect_to_substation(lines, capacity) elif feature_type == 'loadcen': self._connect(gid, capacity) self._update_availability(gid) else: connected = False return connected
[docs] def cost(self, gid, distance, transmission_multiplier=1, capacity=None): """ Compute levelized cost of transmission (LCOT) for connecting to give feature Parameters ---------- gid : int Feature gid to connect to distance : float Distance to feature in kms line_multiplier : float Multiplier for region specific line cost increases capacity : float Capacity needed in MW, if None DO NOT check if connection is possible Returns ------- cost : float Cost of transmission in $/MW, if None indicates connection is NOT possible """ feature_type = self[gid]['type'] line_cost = self._line_cost if feature_type == 'transline': tie_in_cost = self._line_tie_in_cost elif feature_type == 'substation': tie_in_cost = self._station_tie_in_cost elif feature_type == 'loadcen': tie_in_cost = self._center_tie_in_cost elif feature_type == 'pcaloadcen': tie_in_cost = self._sink_tie_in_cost else: tie_in_cost = 0 msg = ("Do not recognize feature type {}, tie_in_cost set to 0" .format(feature_type)) logger.warning(msg) warn(msg, HandlerWarning) cost = self._calc_cost(distance, line_cost=line_cost, tie_in_cost=tie_in_cost, transmission_multiplier=transmission_multiplier) if capacity is not None: if not self.connect(gid, capacity, apply=False): cost = None return cost
[docs] @classmethod def feature_capacity(cls, trans_table, avail_cap_frac=1): """ Compute available capacity for all features Parameters ---------- trans_table : str | pandas.DataFrame Path to .csv or .json containing supply curve transmission mapping avail_cap_frac : float, optional Fraction of capacity that is available for connection, by default 1 Returns ------- feature_cap : pandas.DataFrame Available Capacity for each transmission feature """ try: feature = cls(trans_table, avail_cap_frac=avail_cap_frac) feature_cap = {} for gid, _ in feature._features.items(): feature_cap[gid] = feature.available_capacity(gid) except Exception: logger.exception("Error computing available capacity for all " "features in {}".format(cls)) raise feature_cap = pd.Series(feature_cap) feature_cap.name = SupplyCurveField.TRANS_CAPACITY feature_cap.index.name = SupplyCurveField.TRANS_GID feature_cap = feature_cap.to_frame().reset_index() return feature_cap
[docs]class TransmissionCosts(TransmissionFeatures): """ Class to compute supply curve -> transmission feature costs """ def _features_from_table(self, trans_table): """ Extract features and their capacity from supply curve transmission mapping table and pre-compute the available capacity of each feature Parameters ---------- trans_table : pandas.DataFrame DataFrame of transmission features Returns ------- features : dict Nested dictionary of features (lines, substations, loadcenters) lines : {capacity} substations : {lines} loadcenters : {capacity} """ features = {} if SupplyCurveField.TRANS_CAPACITY not in trans_table: kwargs = {'avail_cap_frac': self._avail_cap_frac} fc = TransmissionFeatures.feature_capacity(trans_table, **kwargs) trans_table = trans_table.merge(fc, on=SupplyCurveField.TRANS_GID) trans_features = trans_table.groupby(SupplyCurveField.TRANS_GID) trans_features = trans_features.first() for gid, feature in trans_features.iterrows(): name = feature[SupplyCurveField.TRANS_TYPE].lower() feature_dict = {'type': name, SupplyCurveField.TRANS_CAPACITY: ( feature[SupplyCurveField.TRANS_CAPACITY] )} features[gid] = feature_dict return features
[docs] def available_capacity(self, gid): """ Get available capacity for given line Parameters ---------- gid : int Unique id of feature of interest Returns ------- avail_cap : float Available capacity = capacity * available fraction default = 100% """ return self[gid][SupplyCurveField.TRANS_CAPACITY]
[docs] @classmethod def feature_costs(cls, trans_table, capacity=None, line_tie_in_cost=14000, line_cost=2279, station_tie_in_cost=0, center_tie_in_cost=0, sink_tie_in_cost=1e9, avail_cap_frac=1, line_limited=False): """ Compute costs for all connections in given transmission table Parameters ---------- trans_table : str | pandas.DataFrame Path to .csv or .json containing supply curve transmission mapping capacity : float Capacity needed in MW, if None DO NOT check if connection is possible line_tie_in_cost : float, optional Cost of connecting to a transmission line in $/MW, by default 14000 line_cost : float, optional Cost of building transmission line during connection in $/MW-km, by default 2279 station_tie_in_cost : float, optional Cost of connecting to a substation in $/MW, by default 0 center_tie_in_cost : float, optional Cost of connecting to a load center in $/MW, by default 0 sink_tie_in_cost : float, optional Cost of connecting to a synthetic load center (infinite sink) in $/MW, by default 1e9 avail_cap_frac : float, optional Fraction of capacity that is available for connection, by default 1 line_limited : bool, optional Substation connection is limited by maximum capacity of the attached lines, legacy method, by default False Returns ------- cost : ndarray Cost of transmission in $/MW, if None indicates connection is NOT possible """ try: feature = cls(trans_table, line_tie_in_cost=line_tie_in_cost, line_cost=line_cost, station_tie_in_cost=station_tie_in_cost, center_tie_in_cost=center_tie_in_cost, sink_tie_in_cost=sink_tie_in_cost, avail_cap_frac=avail_cap_frac, line_limited=line_limited) costs = [] for _, row in trans_table.iterrows(): tm = row.get('transmission_multiplier', 1) costs.append(feature.cost(row[SupplyCurveField.TRANS_GID], row[SupplyCurveField.DIST_SPUR_KM], capacity=capacity, transmission_multiplier=tm)) except Exception: logger.exception("Error computing costs for all connections in {}" .format(cls)) raise return np.array(costs, dtype='float32')