# -*- coding: utf-8 -*-
"""
Module to compute least cost transmission paths, distances, and costs
for a clipped area.
"""
import geopandas as gpd
import logging
import numpy as np
import pandas as pd
import rasterio
from shapely.geometry import Polygon, Point
from shapely.geometry.linestring import LineString
from shapely.ops import nearest_points
from skimage.graph import MCP_Geometric
import time
from warnings import warn
from reV.handlers.exclusions import ExclusionLayers
from reVX.least_cost_xmission.config import (XmissionConfig, TRANS_LINE_CAT,
SINK_CAT, SUBSTATION_CAT,
LOAD_CENTER_CAT)
from reVX.utilities.exceptions import (InvalidMCPStartValueError,
LeastCostPathNotFoundError)
logger = logging.getLogger(__name__)
[docs]class TieLineCosts:
"""
Compute Least Cost Tie-line cost from start location to desired end
locations
"""
def __init__(self, cost_fpath, start_indices, capacity_class, row_slice,
col_slice, xmission_config=None, barrier_mult=100):
"""
Parameters
----------
cost_fpath : str
Full path to HDF5 file containing cost arrays. The cost data
layers should be named ``"tie_line_costs_{capacity}MW"``,
where ``capacity`` is an integer value representing the
capacity of the line (the integer values must matches at
least one of the integer capacity values represented by the
keys in the ``power_classes`` portion of the transmission
config).
start_indices : tuple
Tuple of (row_idx, col_idx) in the **clipped** cost array
(see `row_slice` and `col_slice` definitions below)
indicating the start position of all paths to compute
(typically, this is the centroid of the supply curve cell).
Paths will be computed from this start location to each of
the ``end_indices``, which are also locations in the cost
array (typically transmission feature locations).
capacity_class : : int | str
Integer value representing the transmission feature
capacity. The integer values must matches at least one of
the integer capacity values represented by the keys in the
``power_classes`` portion of the transmission config.
row_slice, col_slice : slice
Slices into the cost raster array used to clip the area that
should be considered when computing a least cost path. This
can be used to limit the consideration space and speed up
computation. Note that the start and end indices must
be given w.r.t. the cost raster that is "clipped" using
these slices.
xmission_config : str | dict | XmissionConfig, optional
Path to transmission config JSON files, dictionary of
transmission config JSONs, or preloaded XmissionConfig
objects. If ``None``, the default config is used.
By default, ``None``.
barrier_mult : int, optional
Multiplier applied to the cost data to create transmission
barrier costs. By default, ``100``.
"""
self._cost_fpath = cost_fpath
self._config = self._parse_config(xmission_config=xmission_config)
self._start_indices = start_indices
self._row_slice = row_slice
self._col_slice = col_slice
self._capacity_class = self._config._parse_cap_class(capacity_class)
self._line_cap_mw = self._config['power_classes'][self.capacity_class]
cost_layer = 'tie_line_costs_{}MW'.format(self._line_cap_mw)
self._cost, self._mcp_cost = self._clip_costs(
cost_fpath, cost_layer, row_slice, col_slice,
barrier_mult=barrier_mult)
self._mcp = None
self._clip_shape = None
with ExclusionLayers(self._cost_fpath) as f:
self.transform = rasterio.Affine(*f.profile['transform'])
self._full_shape = f.shape
def __repr__(self):
msg = "{} starting at {}".format(self.__class__.__name__,
self._start_indices)
return msg
@property
def row_offset(self):
"""
Offset to apply to row indices to move into clipped array
Returns
-------
int
"""
offset = self._row_slice.start
if offset is None:
offset = 0
return offset
@property
def col_offset(self):
"""
Offset to apply to column indices to move into clipped array
Returns
-------
int
"""
offset = self._col_slice.start
if offset is None:
offset = 0
return offset
@property
def row(self):
"""
Row index inside clipped array
Returns
-------
int
"""
return self._start_indices[0]
@property
def col(self):
"""
Column index inside clipped array
Returns
-------
int
"""
return self._start_indices[1]
@property
def cost(self):
"""
Tie line costs array
Returns
-------
ndarray
"""
return self._cost
@property
def mcp_cost(self):
"""
Tie line costs array with barrier costs applied for MCP analysis
Returns
-------
ndarray
"""
return self._mcp_cost
@property
def mcp(self):
"""
MCP_Geometric instance initialized on mcp_cost array with
starting point at sc_point
Returns
-------
MCP_Geometric
"""
if self._mcp is None:
check = self.mcp_cost[self.row, self.col]
if check < 0:
msg = ("Start idx {} does not have a valid cost!"
.format((self.row, self.col)))
raise InvalidMCPStartValueError(msg)
logger.debug('Building MCP instance for size {}'
.format(self.mcp_cost.shape))
self._mcp = MCP_Geometric(self.mcp_cost)
self._mcp.find_costs(starts=[(self.row, self.col)])
return self._mcp
@property
def capacity_class(self):
"""
SC point capacity class
Returns
-------
str
"""
return self._capacity_class
@property
def clip_shape(self):
"""
Shaped of clipped cost raster
Returns
-------
tuple
"""
if self._clip_shape is None:
if self._row_slice == slice(None):
row_shape = self._full_shape[0]
else:
row_max = (self._row_slice.stop if self._row_slice.stop
else self._full_shape[0])
row_min = (self._row_slice.start if self._row_slice.start
else 0)
row_shape = row_max - row_min
if self._col_slice == slice(None):
col_shape = self._full_shape[1]
else:
col_max = (self._col_slice.stop if self._col_slice.stop
else self._full_shape[1])
col_min = (self._col_slice.start if self._col_slice.start
else 0)
col_shape = col_max - col_min
self._clip_shape = (row_shape, col_shape)
return self._clip_shape
@staticmethod
def _parse_config(xmission_config=None):
"""
Load Xmission config if needed
Parameters
----------
config : str | dict | XmissionConfig, optional
Path to Xmission config .json, dictionary of Xmission config
.jsons, or preloaded XmissionConfig objects, by default None
Returns
-------
XmissionConfig
"""
if not isinstance(xmission_config, XmissionConfig):
xmission_config = XmissionConfig(config=xmission_config)
return xmission_config
@staticmethod
def _clip_costs(cost_fpath, cost_layer, row_slice, col_slice,
barrier_mult=100):
"""
Extract clipped cost arrays from exclusion .h5 files
Parameters
----------
cost_fpath : str
Full path of .h5 file with cost arrays
cost_layer : str
Name of cost layer to extract
row_slice : slice
slice along axis 0 (rows) to clip costs too
col_slice : slice
slice along axis 1 (columns) to clip costs too
barrier_mult : int, optional
Multiplier on transmission barrier costs, by default 100
Returns
-------
cost : ndarray
2d clipped array of raw tie-line costs
mcp_cost : ndarray
2d clipped array of mcp cost = cost * barrier * barrier_mult
"""
with ExclusionLayers(cost_fpath) as f:
cost = f[cost_layer, row_slice, col_slice]
barrier = f['transmission_barrier', row_slice, col_slice]
mcp_cost = cost + cost * barrier * barrier_mult
mcp_cost = np.where(mcp_cost < 0, -1, mcp_cost)
return cost, mcp_cost
@staticmethod
def _compute_path_length(indices):
"""
Compute the total length and cell by cell length of the lease
cost path defined by 'indices'
Parameters
----------
indices : ndarray
n x 2 array of MCP traceback of least cost path
Returns
-------
length : float
Total length of path in km
lens : ndarray
Vector of the distance of the least cost path across each
cell
"""
# Use Pythagorean theorem to calculate lengths between cells (km)
# Use c**2 = a**2 + b**2 to determine length of individual paths
lens = np.sqrt(np.sum(np.diff(indices, axis=0)**2, axis=1))
length = np.sum(lens) * 90 / 1000
# Need to determine distance coming into and out of any cell. Assume
# paths start and end at the center of a cell. Therefore, distance
# traveled in the cell is half the distance entering it and half the
# distance exiting it. Duplicate all lengths, pad 0s on ends for start
# and end cells, and divide all distance by half.
lens = np.repeat(lens, 2)
lens = np.insert(np.append(lens, 0), 0, 0)
lens = lens / 2
# Group entrance and exits distance together, and add them
lens = lens.reshape((int(lens.shape[0] / 2), 2))
lens = np.sum(lens, axis=1)
return length, lens
[docs] def least_cost_path(self, end_idx, save_path=False):
"""
Find least cost path, its length, and its total (un-barriered)
cost
Parameters
----------
end_idx : Tuple[int, int]
(row, col) index of end point to connect and compute least
cost path to.
save_path : bool
Flag to save least cost path as a multi-line geometry.
By default, ``False``.
Returns
-------
length : float
Length of path (km).
cost : float
Cost of path including terrain and land use multipliers, but
not barrier costs.
poi_lat, poi_lon : numpy.float64
Latitude and longitude of the `end_idx` of the least cost
path (i.e. the POI/transmission feature that was connected
to).
path : shapely.geometry.linestring, optional
Path as a LineString, if `save_path` was set to ``True``.
"""
row, col = end_idx
check = (row < 0 or col < 0 or row >= self.clip_shape[0]
or col >= self.clip_shape[1])
if check:
msg = ('End point ({}, {}) is out side of clipped cost raster '
'with shape {}'.format(row, col, self.clip_shape))
logger.exception(msg)
raise ValueError(msg)
check = self.mcp_cost[row, col]
if check < 0:
msg = ("End idx {} does not have a valid cost!"
.format(end_idx))
raise LeastCostPathNotFoundError(msg)
try:
indices = np.array(self.mcp.traceback((row, col)))
except ValueError as ex:
msg = ('Unable to find path from start {} to {}: {}'
.format((self.row, self.col), end_idx, ex))
raise LeastCostPathNotFoundError(msg) from ex
# Extract costs of cells
# pylint: disable=unsubscriptable-object
cell_costs = self.cost[indices[:, 0], indices[:, 1]]
length, lens = self._compute_path_length(indices)
# Multiple distance travel through cell by cost and sum it!
cost = np.sum(cell_costs * lens)
with ExclusionLayers(self._cost_fpath) as f:
poi_lat = f['latitude', self._row_slice, self._col_slice][row, col]
poi_lon = (
f['longitude', self._row_slice, self._col_slice][row, col])
if save_path:
row = indices[:, 0] + self.row_offset
col = indices[:, 1] + self.col_offset
x, y = rasterio.transform.xy(self.transform, row, col)
geom = Point if indices.shape[0] == 1 else LineString
out = length, cost, poi_lat, poi_lon, geom(list(zip(x, y)))
else:
out = length, cost, poi_lat, poi_lon
return out
[docs] def compute(self, end_indices, save_paths=False):
"""
Compute least cost paths to given end indices
Parameters
----------
end_indices : tuple | list
(row, col) index or list of (row, col) indices of end
point(s) to connect and compute least cost path to
save_paths : bool, optional
Flag to save least cost path as a multi-line geometry,
by default False
Returns
-------
tie_lines : pandas.DataFrame | gpd.GeoDataFrame
DataFrame of lengths and costs for each path or GeoDataFrame
of length, cost, and geometry for each path
"""
if isinstance(end_indices, tuple):
end_indices = [end_indices]
lengths = []
costs = []
paths = []
poi_lats = []
poi_lons = []
for end_idx in end_indices:
out = self.least_cost_path(end_idx, save_path=save_paths)
lengths.append(out[0])
costs.append(out[1])
poi_lats.append(out[2])
poi_lons.append(out[3])
if save_paths:
paths.append(out[4])
tie_lines = pd.DataFrame({'length_km': lengths, 'cost': costs,
'poi_lat': poi_lats, 'poi_lon': poi_lons})
if save_paths:
with ExclusionLayers(self._cost_fpath) as f:
crs = f.crs
tie_lines = gpd.GeoDataFrame(tie_lines, geometry=paths, crs=crs)
return tie_lines
[docs] @classmethod
def run(cls, cost_fpath, start_indices, end_indices, capacity_class,
row_slice, col_slice, xmission_config=None, barrier_mult=100,
save_paths=False):
"""
Compute least cost tie-line path to all features to be connected
a single supply curve point.
Parameters
----------
cost_fpath : str
Full path to HDF5 file containing cost arrays. The cost data
layers should be named ``"tie_line_costs_{capacity}MW"``,
where ``capacity`` is an integer value representing the
capacity of the line (the integer values must matches at
least one of the integer capacity values represented by the
keys in the ``power_classes`` portion of the transmission
config).
start_indices : tuple
Tuple of (row_idx, col_idx) in the **clipped** cost array
(see `row_slice` and `col_slice` definitions below)
indicating the start position of all paths to compute
(typically, this is the centroid of the supply curve cell).
Paths will be computed from this start location to each of
the ``end_indices``, which are also locations in the cost
array (typically transmission feature locations).
capacity_class : : int | str
Integer value representing the transmission feature
capacity. The integer values must matches at least one of
the integer capacity values represented by the keys in the
``power_classes`` portion of the transmission config.
row_slice, col_slice : slice
Slices into the cost raster array used to clip the area that
should be considered when computing a least cost path. This
can be used to limit the consideration space and speed up
computation. Note that the start and end indices must
be given w.r.t. the cost raster that is "clipped" using
these slices.
xmission_config : str | dict | XmissionConfig, optional
Path to transmission config JSON files, dictionary of
transmission config JSONs, or preloaded XmissionConfig
objects. If ``None``, the default config is used.
By default, ``None``.
barrier_mult : int, optional
Multiplier applied to the cost data to create transmission
barrier costs. By default, ``100``.
save_paths : bool, optional
Flag to save least cost path as a multi-line geometry.
By default, ``False``.
Returns
-------
tie_lines : pandas.DataFrame | gpd.GeoDataFrame
DataFrame of lengths and costs for each path or GeoDataFrame
of length, cost, and geometry for each path
"""
ts = time.time()
tlc = cls(cost_fpath, start_indices, capacity_class, row_slice,
col_slice, xmission_config=xmission_config,
barrier_mult=barrier_mult)
tie_lines = tlc.compute(end_indices, save_paths=save_paths)
logger.debug('Least Cost tie-line computed in {:.4f} min'
.format((time.time() - ts) / 60))
return tie_lines
[docs]class TransCapCosts(TieLineCosts):
"""
Compute total transmission capital cost
(least-cost tie-line cost + connection cost) for all features to be
connected a single supply curve point
"""
def __init__(self, cost_fpath, sc_point, features, capacity_class,
radius=None, xmission_config=None, barrier_mult=100):
"""
Parameters
----------
cost_fpath : str
Full path of .h5 file with cost arrays
sc_point : gpd.GeoSeries
Supply Curve Point meta data
features : pandas.DataFrame
Table of transmission features
capacity_class : int | str
Transmission feature capacity_class class
radius : int, optional
Radius around sc_point to clip cost to, by default None
xmission_config : str | dict | XmissionConfig, optional
Path to Xmission config .json, dictionary of Xmission config
.jsons, or preloaded XmissionConfig objects, by default None
barrier_mult : int, optional
Multiplier on transmission barrier costs, by default 100
"""
self._sc_point = sc_point
start_indices, row_slice, col_slice = self._get_clipping_slices(
cost_fpath, sc_point[['row', 'col']].values, radius=radius)
super().__init__(cost_fpath, start_indices, capacity_class, row_slice,
col_slice, xmission_config=xmission_config,
barrier_mult=barrier_mult)
self._features = self._prep_features(features)
self._clip_mask = None
@property
def sc_point(self):
"""
Supply curve point data:
- gid
- lat
- lon
- idx (row, col)
Returns
-------
pandas.Series
"""
return self._sc_point
@property
def sc_point_gid(self):
"""
Supply curve point gid
Returns
-------
int
"""
return self.sc_point['sc_point_gid']
@property
def features(self):
"""
Table of transmission features
Returns
-------
pandas.DataFrame
"""
return self._features
@property
def clip_mask(self):
"""
Polygon used to clip transmission lines to the clipped raster
bounds
Returns
-------
shapely.Polygon
"""
if self._clip_mask is None:
# pylint: disable=using-constant-test
row_bounds = [self._row_slice.start
if self._row_slice.start else 0,
self._row_slice.stop - 1
if self._row_slice.stop else self.clip_shape[0] - 1]
col_bounds = [self._col_slice.start
if self._col_slice.start else 0,
self._col_slice.stop - 1
if self._col_slice.stop else self.clip_shape[1] - 1]
x, y = rasterio.transform.xy(self.transform, row_bounds,
col_bounds)
self._clip_mask = Polygon([[x[0], y[0]],
[x[1], y[0]],
[x[1], y[1]],
[x[0], y[1]],
[x[0], y[0]]])
return self._clip_mask
@property
def tie_line_voltage(self):
"""
Tie line voltage in kV
Returns
-------
int
"""
return self._config.capacity_to_kv(self.capacity_class)
@staticmethod
def _get_clipping_slices(cost_fpath, sc_point_idx, radius=None):
"""
Get array slices for clipped area around SC point (row, col)
index
Parameters
----------
cost_fpath : str
Full path of .h5 file with cost arrays
row : int
SC point row index
col : int
SC point column index
radius : int, optional
Radius around sc_point to clip cost to, by default None
Returns
-------
start_indices : tuple
Start index in clipped raster space
row_slice : slice
Row start, stop indices for clipped cost array
col_slice : slice
Column start, stop indices for clipped cost array
"""
with ExclusionLayers(cost_fpath) as f:
shape = f.shape
if radius is not None:
row, col = sc_point_idx
row_min = max(row - radius, 0)
row_max = min(row + radius, shape[0])
col_min = max(col - radius, 0)
col_max = min(col + radius, shape[1])
start_indices = (row - row_min, col - col_min)
else:
start_indices = sc_point_idx
row_min, row_max = None, None
col_min, col_max = None, None
row_slice = slice(row_min, row_max)
col_slice = slice(col_min, col_max)
return start_indices, row_slice, col_slice
@staticmethod
def _calc_xformer_cost(features, tie_line_voltage, config=None):
"""
Compute transformer costs in $/MW for needed features, all
others will be 0
Parameters
----------
features : pd.DataFrame
Table of transmission features to compute transformer costs
for
tie_line_voltage : int
Tie-line voltage in kV
config : str | dict | XmissionConfig
Transmission configuration
Returns
-------
xformer_costs : ndarray
vector of $/MW transformer costs
"""
if not isinstance(config, XmissionConfig):
config = XmissionConfig(config=config)
mask = features['category'] == SUBSTATION_CAT
mask &= features['max_volts'] < tie_line_voltage
if np.any(mask):
msg = ('Voltages for substations {} do not exceed tie-line '
'voltage of {}'
.format(features.loc[mask, 'trans_gid'].values,
tie_line_voltage))
logger.error(msg)
raise RuntimeError(msg)
xformer_costs = np.zeros(len(features))
for volts, df in features.groupby('min_volts'):
idx = df.index.values
xformer_costs[idx] = config.xformer_cost(volts, tie_line_voltage)
mask = features['category'] == TRANS_LINE_CAT
mask &= features['max_volts'] < tie_line_voltage
xformer_costs[mask] = 0
mask = features['min_volts'] <= tie_line_voltage
xformer_costs[mask] = 0
mask = features['region'] == config['iso_lookup']['TEPPC']
xformer_costs[mask] = 0
return xformer_costs
@staticmethod
def _calc_sub_upgrade_cost(features, tie_line_voltage, config=None):
"""
Compute substation upgrade costs for needed features, all others
will be 0
Parameters
----------
features : pd.DataFrame
Table of transmission features to compute transformer costs
for
tie_line_voltage : int
Tie-line voltage in kV
config : str | dict | XmissionConfig
Transmission configuration
Returns
-------
sub_upgrade_costs : ndarray
Substation upgrade costs in $
"""
if not isinstance(config, XmissionConfig):
config = XmissionConfig(config=config)
sub_upgrade_cost = np.zeros(len(features))
if np.any(features['region'] == 0):
mask = features['region'] == 0
msg = ('Features {} have an invalid region! Region must != 0!'
.format(features.loc[mask, 'trans_gid'].values))
logger.error(msg)
raise RuntimeError(msg)
mask = features['category'].isin([SUBSTATION_CAT, LOAD_CENTER_CAT])
if np.any(mask):
for region, df in features.loc[mask].groupby('region'):
idx = df.index.values
sub_upgrade_cost[idx] = config.sub_upgrade_cost(
region, tie_line_voltage)
return sub_upgrade_cost
@staticmethod
def _calc_new_sub_cost(features, tie_line_voltage, config=None):
"""
Compute new substation costs for needed features, all others
will be 0
Parameters
----------
features : pd.DataFrame
Table of transmission features to compute transformer costs
for
tie_line_voltage : int
Tie-line voltage in kV
config : str | dict | XmissionConfig
Transmission configuration
Returns
-------
new_sub_cost : ndarray
new substation costs in $
"""
if not isinstance(config, XmissionConfig):
config = XmissionConfig(config=config)
new_sub_cost = np.zeros(len(features))
if np.any(features['region'] == 0):
mask = features['region'] == 0
msg = ('Features {} have an invalid region! Region must != 0!'
.format(features.loc[mask, 'trans_gid'].values))
logger.error(msg)
raise RuntimeError(msg)
mask = features['category'] == TRANS_LINE_CAT
if np.any(mask):
for region, df in features.loc[mask].groupby('region'):
idx = df.index.values
new_sub_cost[idx] = config.new_sub_cost(
region, tie_line_voltage)
return new_sub_cost
def _prep_features(self, features):
"""
Shift feature row and col indices of transmission features from
the global domain to the clipped raster, clip transmission lines
to clipped array and find nearest point
Parameters
----------
features : pandas.DataFrame
Table of transmission features
clip_lines : bool, optional
Flag to clip transmission lines to clipped raster bounds,
set to false when clipping radius is None,
by default True
Returns
-------
features : pandas.DataFrame
Transmission features with row/col indices shifted to
clipped raster
"""
mapping = {'gid': 'trans_gid', 'trans_gids': 'trans_line_gids'}
features = features.rename(columns=mapping).drop(columns='dist',
errors='ignore')
if self.row_offset is not None:
features['row'] -= self.row_offset
if self.col_offset is not None:
features['col'] -= self.col_offset
return features.reset_index(drop=True)
def _get_trans_line_idx(self, trans_line, clip=False):
"""
Map the nearest point on each transmission lines to the cost
raster
Parameters
----------
trans_lines : GeoPandas.GeoSeries
Transmission lines to be connected to each supply curve
point, the nearest point on each line needs to be mapped to
the cost raster grid in order to compute the least cost path
clip : bool
Flag to clip the transmission lines to the cost raster
domain
Returns
-------
[row, col] : list
Row, col index of the nearest point on the transmission line
to the supply curve point, used for least cost path
"""
if clip:
logger.debug("Clipping transmission line {} to raster domain"
.format(trans_line['trans_gid']))
clipped_trans_line = {'geometry': trans_line['geometry']}
clipped_trans_line = gpd.clip(gpd.GeoSeries(clipped_trans_line),
self.clip_mask)
if len(clipped_trans_line) == len(trans_line):
trans_line = clipped_trans_line
point, _ = nearest_points(trans_line['geometry'],
self.sc_point['geometry'])
row, col = rasterio.transform.rowcol(self.transform, point.x, point.y)
row -= self.row_offset
col -= self.col_offset
if not clip:
clip = (row < 0 or row >= self.clip_shape[0]
or col < 0 or col >= self.clip_shape[1])
if clip:
row, col = self._get_trans_line_idx(trans_line, clip=clip)
else:
row = min(max(row, 0), self.clip_shape[0] - 1)
col = min(max(col, 0), self.clip_shape[1] - 1)
return [row, col]
[docs] def compute_tie_line_costs(self, min_line_length=5.7, # noqa: C901
save_paths=False):
"""
Compute least cost path and distance between supply curve point
and every transmission feature
Parameters
----------
min_line_length : float, optional
Minimum line length in km, by default 5.7
save_paths : bool, optional
Flag to save least cost path as a multi-line geometry,
by default False
Returns
-------
tie_line_costs : gpd.GeoDataFrame
Updated table of transmission features with the tie-line
cost and distance added
"""
tie_voltage = self.tie_line_voltage
features = self.features.copy()
features['raw_line_cost'] = None
features['dist_km'] = None
if save_paths:
paths = []
logger.debug('Determining path lengths and costs')
for index, feat in features.iterrows():
if feat['category'] == TRANS_LINE_CAT:
t_line = True
feat_idx = self._get_trans_line_idx(feat)
else:
t_line = False
feat_idx = feat[['row', 'col']].values
try:
# pylint: disable=unbalanced-tuple-unpacking
result = self.least_cost_path(feat_idx, save_path=save_paths)
if save_paths:
(length, cost, poi_lat, poi_lon, path) = result
else:
(length, cost, poi_lat, poi_lon) = result
if t_line and feat['max_volts'] < tie_voltage:
msg = ('Tie-line {} voltage of {}kV is less than tie line '
'voltage of {}kV.'
.format(feat['trans_gid'], feat['max_volts'],
tie_voltage))
logger.debug(msg)
cost = 1e12
if length < min_line_length:
msg = ('Tie-line length {} will be increased to the '
'minimum allowed line length: {}.'
.format(length, min_line_length))
logger.debug(msg)
min_mult = (1 if np.isclose(length, 0)
else min_line_length / length)
cost = cost * min_mult
length = min_line_length
features.loc[index, 'dist_km'] = length
features.loc[index, 'raw_line_cost'] = cost
features.loc[index, 'poi_lat'] = poi_lat
features.loc[index, 'poi_lon'] = poi_lon
if save_paths:
paths.append(path)
except LeastCostPathNotFoundError as ex:
msg = ("Could not connect SC point {} to transmission feature "
"{}: {}"
.format(self.sc_point_gid, feat['trans_gid'], ex))
logger.debug(msg)
if t_line:
features.loc[index, 'raw_line_cost'] = 1e12
if save_paths:
paths.append(self._sc_point.geometry)
except InvalidMCPStartValueError:
raise
except Exception:
logger.exception('Could not connect SC point {} to '
'transmission features!'
.format(self.sc_point_gid))
raise
if save_paths:
with ExclusionLayers(self._cost_fpath) as el:
crs = el.crs
features = gpd.GeoDataFrame(features, geometry=paths, crs=crs)
return features
[docs] def compute_connection_costs(self, features=None):
"""
Calculate connection costs for tie lines
Returns
-------
features : pd.DataFrame
Updated table of transmission features with the connection
costs added
"""
if features is None:
features = self.features.copy()
features = features.reset_index(drop=True)
# Length multiplier
features['length_mult'] = 1.0
# Short cutoff
mask = features['dist_km'] < 3 * 5280 / 3.28084 / 1000
features.loc[mask, 'length_mult'] = 1.5
# Medium cutoff
mask = features['dist_km'] <= 10 * 5280 / 3.28084 / 1000
features.loc[mask, 'length_mult'] = 1.2
features['tie_line_cost'] = (features['raw_line_cost']
* features['length_mult'])
# Transformer costs
features['xformer_cost_per_mw'] = self._calc_xformer_cost(
features, self.tie_line_voltage, config=self._config)
capacity = int(self.capacity_class.strip('MW'))
features['xformer_cost'] = (features['xformer_cost_per_mw']
* capacity)
# Substation costs
features['sub_upgrade_cost'] = self._calc_sub_upgrade_cost(
features, self.tie_line_voltage, config=self._config)
features['new_sub_cost'] = self._calc_new_sub_cost(
features, self.tie_line_voltage, config=self._config)
# Sink costs
mask = features['category'] == SINK_CAT
features.loc[mask, 'new_sub_cost'] = 1e11
# Total cost
features['connection_cost'] = (features['xformer_cost']
+ features['sub_upgrade_cost']
+ features['new_sub_cost'])
return features
[docs] def compute(self, min_line_length=5.7, save_paths=False,
simplify_geo=None):
"""
Compute Transmission capital cost of connecting SC point to
transmission features.
trans_cap_cost = tie_line_cost + connection_cost
Parameters
----------
min_line_length : float, optional
Minimum line length in km, by default 5.7
save_paths : bool, optional
Flag to save least cost path as a multi-line geometry,
by default False
simplify_geo : float | None, optional
If float, simplify geometries using this value
Returns
-------
features : pd.DataFrame | gpd.GeoDataFrame
Transmission table with tie-line costs and distances and
connection costs added. Includes paths if
``save_paths == True``
"""
features = self.compute_tie_line_costs(min_line_length=min_line_length,
save_paths=save_paths)
mask = features['raw_line_cost'].isna()
if mask.any():
msg = ("The following features could not be connected to SC point "
"{}:\n{}".format(self.sc_point_gid,
features.loc[mask, 'trans_gid']))
logger.warning(msg)
warn(msg)
features = features.loc[~mask]
features = self.compute_connection_costs(features=features)
features['trans_cap_cost'] = (features['tie_line_cost']
+ features['connection_cost'])
drop_cols = ['row', 'col']
if not save_paths:
drop_cols.append('geometry')
features = features.drop(columns=drop_cols,
errors='ignore').reset_index(drop=True)
features['sc_row_ind'] = self.sc_point['sc_row_ind']
features['sc_col_ind'] = self.sc_point['sc_col_ind']
features['sc_point_gid'] = self.sc_point_gid
if save_paths and simplify_geo:
features.geometry = features.geometry.simplify(simplify_geo)
return features
[docs] @classmethod
def run(cls, cost_fpath, sc_point, features, capacity_class, radius=None,
xmission_config=None, barrier_mult=100, min_line_length=5.7,
save_paths=False, simplify_geo=None):
"""
Compute Transmission capital cost of connecting SC point to
transmission features.
trans_cap_cost = tie_line_cost + connection_cost
Parameters
----------
cost_fpath : str
Full path of .h5 file with cost arrays
sc_point : gpd.GeoSeries
Supply Curve Point meta data
features : pandas.DataFrame
Table of transmission features
capacity_class : int | str
Transmission feature capacity_class class
radius : int, optional
Radius around sc_point to clip cost to, by default None
xmission_config : str | dict | XmissionConfig, optional
Path to Xmission config .json, dictionary of Xmission config
.jsons, or preloaded XmissionConfig objects, by default None
barrier_mult : int, optional
Multiplier on transmission barrier costs, by default 100
min_line_length : float, optional
Minimum line length in km, by default 5.7
save_paths : bool, optional
Flag to save least cost path as a multi-line geometry,
by default False
simplify_geo : float | None, optional
If float, simplify geometries using this value
Returns
-------
features : pd.DataFrame | gpd.GeoDataFrame | None
Transmission table with tie-line costs and distances and
connection costs added. Will include paths if
``save_paths == True``
"""
ts = time.time()
logger.debug('Processing sc_point {}, {}, save_paths={}'
.format(sc_point.sc_point_gid, sc_point.geometry,
save_paths))
try:
tcc = cls(cost_fpath, sc_point, features, capacity_class,
radius=radius, xmission_config=xmission_config,
barrier_mult=barrier_mult)
features = tcc.compute(min_line_length=min_line_length,
save_paths=save_paths,
simplify_geo=simplify_geo)
logger.debug('Least Cost transmission costs computed for '
'SC point {} in {:.4f}s'
.format(tcc.sc_point_gid, time.time() - ts))
except InvalidMCPStartValueError as ex:
features = None
msg = ('Could not connect SC point {} to transmission features: {}'
.format(sc_point['sc_point_gid'], ex))
logger.debug(msg)
except Exception as ex:
msg = ('Failed to connect SC point {} to transmission features: {}'
.format(sc_point['sc_point_gid'], ex))
logger.exception(msg)
raise
return features
[docs]class ReinforcementLineCosts(TieLineCosts):
"""
Compute Least Cost Reinforcement Line cost from substations to
network nodes.
The reinforcement line path will attempt to follow existing
transmission lines for as long as possible. Costs are calculated
using half of the greenfield cost of the transmission line that is
being traced. If the reinforcement path travels along two different
line voltages, corresponding costs are used for each portion of the
path. In the case that the path must cross a region with no existing
transmission lines to reach the destination, half (50%) of the
greenfield cost of the input ``capacity_class`` is used.
"""
def __init__(self, transmission_lines, cost_fpath, start_indices,
capacity_class, row_slice, col_slice, xmission_config=None,
barrier_mult=100):
"""
Parameters
----------
transmission_lines : dict
Dictionary where the keys are the names of cost layers in
the cost HDF5 file and values are arrays with the
corresponding existing transmission lines rastered into
them (i.e. array value is 1 at a pixel if there is a
transmission line, otherwise 0). These arrays will be used
to compute the reinforcement costs along existing
transmission lines of differing voltages.
cost_fpath : str
Full path of .h5 file with cost arrays.
start_indices : tuple
Tuple of (row_idx, col_idx) in the cost array indicating the
start position of all reinforcement line paths to compute
(typically, this is the location of the network node in the
BA). Paths will be computed from this start location to each
of the `end_indices`, which are also locations in the cost
array (typically substations within the BA of the network
node).
capacity_class : int | str
Transmission feature ``capacity_class`` to use for the
'base' greenfield costs. 'Base' greenfield costs are only
used if the reinforcement path *must* deviate from existing
transmission lines. Typically, a capacity class of 400 MW
(230kV transmission line) is used for the base greenfield
costs.
row_slice, col_slice : slice
Row and column slices into the cost array representing the
window to compute reinforcement line path within.
xmission_config : str | dict | XmissionConfig, optional
Path to Xmission config .json, dictionary of Xmission config
.jsons, or preloaded XmissionConfig objects.
By default, ``None``.
barrier_mult : int, optional
Multiplier on transmission barrier costs.
By default, ``100``.
"""
super().__init__(cost_fpath=cost_fpath, start_indices=start_indices,
capacity_class=capacity_class, row_slice=row_slice,
col_slice=col_slice, xmission_config=xmission_config,
barrier_mult=barrier_mult)
self._cost = self._cost / self._line_cap_mw
with ExclusionLayers(cost_fpath) as f:
for capacity_mw, lines in transmission_lines.items():
t_lines = np.where(lines[row_slice, col_slice])
cost_layer = 'tie_line_costs_{}MW'.format(capacity_mw)
costs = f[cost_layer, row_slice, col_slice][t_lines]
self._mcp_cost[t_lines] = costs * 1e-9
self._cost[t_lines] = costs / capacity_mw
[docs] @classmethod
def run(cls, transmission_lines, cost_fpath, start_indices, end_indices,
capacity_class, row_slice, col_slice, xmission_config=None,
barrier_mult=100, save_paths=False):
"""
Compute reinforcement line path to all features to be connected
a single supply curve point.
Parameters
----------
transmission_lines : dict
Dictionary where the keys are the names of cost layers in
the cost HDF5 file and values are arrays with the
corresponding existing transmission lines rastered into
them (i.e. array value is 1 at a pixel if there is a
transmission line, otherwise 0). These arrays will be used
to compute the reinforcement costs along existing
transmission lines of differing voltages.
cost_fpath : str
Full path of .h5 file with cost arrays.
start_indices : tuple
Tuple of (row_idx, col_idx) in the cost array indicating the
start position of all reinforcement line paths to compute
(typically, this is the location of the network node in the
BA). Paths will be computed from this start location to each
of the `end_indices`, which are also locations in the cost
array (typically substations within the BA of the network
node).
end_indices : tuple | list
Tuple (row, col) index or list of (row, col) indices in the
cost array indicating the end location(s) to compute
reinforcement line paths to (typically substations within a
single BA). Paths are computed from the `start_indices`
(typically the network node of the BA) to each of the
individual pairs of `end_indices`.
capacity_class : int | str
Transmission feature ``capacity_class`` to use for the
'base' greenfield costs. 'Base' greenfield costs are only
used if the reinforcement path *must* deviate from existing
transmission lines. Typically, a capacity class of 400 MW
(230kV transmission line) is used for the base greenfield
costs.
row_slice, col_slice : slice
Row and column slices into the cost array representing the
window to compute reinforcement line path within.
xmission_config : str | dict | XmissionConfig, optional
Path to Xmission config .json, dictionary of Xmission config
.jsons, or preloaded XmissionConfig objects.
By default, ``None``.
barrier_mult : int, optional
Multiplier on transmission barrier costs.
By default, ``100``.
save_paths : bool, optional
Flag to save reinforcement line path as a multi-line
geometry. By default, ``False``.
Returns
-------
tie_lines : pandas.DataFrame | gpd.GeoDataFrame
DataFrame of lengths and costs for each reinforcement line
path or GeoDataFrame of length, cost, and geometry for each
reinforcement line path.
"""
ts = time.time()
tlc = cls(transmission_lines, cost_fpath, start_indices,
capacity_class, row_slice, col_slice,
xmission_config=xmission_config, barrier_mult=barrier_mult)
tie_lines = tlc.compute(end_indices, save_paths=save_paths)
tie_lines['cost'] = tie_lines['cost'] * 0.5
row, col = start_indices
with ExclusionLayers(cost_fpath) as f:
tie_lines['poi_lat'] = (
f['latitude', row_slice, col_slice][row, col])
tie_lines['poi_lon'] = (
f['longitude', row_slice, col_slice][row, col])
tie_lines = tie_lines.rename({'length_km': 'reinforcement_dist_km',
'cost': 'reinforcement_cost_per_mw',
'poi_lat': 'reinforcement_poi_lat',
'poi_lon': 'reinforcement_poi_lon'},
axis=1)
logger.debug('Reinforcement Path Cost computed in {:.4f} min'
.format((time.time() - ts) / 60))
return tie_lines