Source code for reVX.least_cost_xmission.least_cost_xmission

# -*- coding: utf-8 -*-
"""
Module to compute least cost transmission paths, distances, AND costs
for one or more SC points
"""
from warnings import warn
import geopandas as gpd
import json
import logging
import numpy as np
import os
import pandas as pd
import rasterio
from pyproj.crs import CRS
from scipy.spatial import cKDTree
from shapely.geometry import Point
from concurrent.futures import as_completed
import time

from reV.handlers.exclusions import ExclusionLayers
from reV.supply_curve.extent import SupplyCurveExtent
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.loggers import log_mem

from reVX.least_cost_xmission.config import (TRANS_LINE_CAT, LOAD_CENTER_CAT,
                                             SINK_CAT, SUBSTATION_CAT)
from reVX.least_cost_xmission.least_cost_paths import LeastCostPaths
from reVX.least_cost_xmission.trans_cap_costs import TransCapCosts

logger = logging.getLogger(__name__)


[docs]class LeastCostXmission(LeastCostPaths): """ Compute Least Cost tie-line paths and full transmission cap cost for all possible connections to all supply curve points """ REQUIRED_LAYERS = ['transmission_barrier', 'ISO_regions'] def __init__(self, cost_fpath, features_fpath, resolution=128, xmission_config=None, min_line_length=0): """ Parameters ---------- cost_fpath : str Path to h5 file with cost rasters and other required layers features_fpath : str Path to GeoPackage with transmission features resolution : int, optional SC point resolution, by default 128 xmission_config : str | dict | XmissionConfig, optional Path to Xmission config .json, dictionary of Xmission config .jsons, or preloaded XmissionConfig objects, by default None min_line_length : int | float, optional Minimum line length in km, by default 0. """ self._check_layers(cost_fpath) self._config = TransCapCosts._parse_config( xmission_config=xmission_config) (self._sc_points, self._features, self._sub_lines_mapping, self._shape) =\ self._map_to_costs(cost_fpath, features_fpath, resolution=resolution) self._cost_fpath = cost_fpath self._tree = None self._sink_coords = None self._min_line_len = min_line_length logger.debug('{} initialized'.format(self)) def __repr__(self): msg = ("{} to be computed for {} sc_points and {} features" .format(self.__class__.__name__, len(self.sc_points), len(self.features))) return msg @property def sc_points(self): """ Table of supply curve points Returns ------- gpd.GeoDataFrame """ return self._sc_points @property def features(self): """ Table of features to compute paths for Returns ------- pandas.DataFrame """ return self._features @property def sub_lines_mapping(self): """ Series mapping substations to the transmission lines connected to each substation Returns ------- pandas.Series """ return self._sub_lines_mapping @property def sink_coords(self): """ Inf sink coordinates (row, col) Returns ------- ndarray """ if self._sink_coords is None: mask = self.features['category'] == SINK_CAT self._sink_coords = self.features.loc[mask, ['row', 'col']].values return self._sink_coords @property def sink_tree(self): """ cKDTree for infinite sinks Returns ------- cKDTree """ if self._tree is None: self._tree = cKDTree(self.sink_coords) return self._tree @staticmethod def _load_trans_feats(features_fpath): """ Load existing transmission features from disk. Substations will be loaded from cache file if it exists Parameters ---------- features_fpath : str Path to GeoPackage with trans features Returns ------- features : gpd.GeoDataFrame DataFrame of transmission features sub_line_map : pandas.Series Mapping of sub-station trans_gid to connected transmission line trans_gids """ logger.debug('Loading transmission features') features = gpd.read_file(features_fpath) features = features.drop(columns=['bgid', 'egid', 'cap_left'], errors='ignore') mapping = {'gid': 'trans_gid', 'trans_gids': 'trans_line_gids'} features = features.rename(columns=mapping) features['min_volts'] = 0 features['max_volts'] = 0 # Transmission lines mask = features['category'] == TRANS_LINE_CAT voltage = features.loc[mask, 'voltage'].values features.loc[mask, 'min_volts'] = voltage features.loc[mask, 'max_volts'] = voltage # Load Center and Sinks mask = features['category'].isin([LOAD_CENTER_CAT, SINK_CAT]) features.loc[mask, 'min_volts'] = 1 features.loc[mask, 'max_volts'] = 9999 sub_lines_map = {} mask = features['category'] == SUBSTATION_CAT bad_subs = np.zeros(len(features), dtype=bool) for idx, row in features.loc[mask].iterrows(): gid = row['trans_gid'] lines = row['trans_line_gids'] if isinstance(lines, str): lines = json.loads(lines) sub_lines_map[gid] = lines lines_mask = features['trans_gid'].isin(lines) voltage = features.loc[lines_mask, 'voltage'].values if np.max(voltage) >= 69: features.loc[idx, 'min_volts'] = np.min(voltage) features.loc[idx, 'max_volts'] = np.max(voltage) else: bad_subs[idx] = True if any(bad_subs): msg = ("The following sub-stations do not have the minimum " "required voltage of 69 kV and will be dropped:\n{}" .format(features.loc[bad_subs, 'trans_gid'])) logger.warning(msg) warn(msg) features = features.loc[~bad_subs].reset_index(drop=True) return features, pd.Series(sub_lines_map) @staticmethod def _create_sc_points(cost_fpath, resolution=128): """ Load SC points, covert row/col to array wide, and determine x/y for reV projection Parameters ---------- cost_fpath : str Path to h5 file with cost rasters and other required layers resolution : int, optional SC point resolution, by default 128 Returns sc_points : gpd.GeoDataFrame SC points """ logger.debug('Loading Supply Curve Points for res: {}' .format(resolution)) sce = SupplyCurveExtent(cost_fpath, resolution=resolution) sc_points = sce.points.rename(columns={'row_ind': 'sc_row_ind', 'col_ind': 'sc_col_ind'}) shape = sce.excl_shape sc_points['sc_point_gid'] = sc_points.index.values row = np.round(sc_points['sc_row_ind'] * resolution + resolution / 2) row = np.where(row >= shape[0], shape[0] - 1, row) sc_points['row'] = row.astype(int) col = np.round(sc_points['sc_col_ind'] * resolution + resolution / 2) col = np.where(col >= shape[1], shape[1] - 1, col) sc_points['col'] = col.astype(int) return sc_points @staticmethod def _get_feature_cost_indices(features, crs, transform, shape): """ Map features to cost row, col indices using rasterio transform Parameters ---------- features : gpd.GeoDataFrame GeoDataFrame of features to map to cost raster crs : pyproj.crs.CRS CRS of cost raster transform : raster.Affine Transform of cost raster shape : tuple Cost raster shape Returns ------- row : ndarray Vector of row indices for each feature col : ndarray Vector of col indices for each features mask : ndarray Boolean mask of features with indices outside of cost raster """ row, col, mask = super(LeastCostXmission, LeastCostXmission)._get_feature_cost_indices( features, crs, transform, shape) t_lines = features['category'] == TRANS_LINE_CAT mask |= t_lines row[t_lines] = np.where(row[t_lines] >= 0, row[t_lines], 0) row[t_lines] = np.where(row[t_lines] < shape[0], row[t_lines], shape[0] - 1) col[t_lines] = np.where(col[t_lines] >= 0, col[t_lines], 0) col[t_lines] = np.where(col[t_lines] < shape[1], col[t_lines], shape[1] - 1) return row, col, mask @classmethod def _map_to_costs(cls, cost_fpath, features_fpath, resolution=128): """ Map supply curve points and transmission features to cost array pixel indices Parameters ---------- cost_fpath : str Path to h5 file with cost rasters and other required layers features_fpath : str Path to GeoPackage with transmission features resolution : int, optional SC point resolution, by default 128 Returns ------- sc_points : gpd.GeoDataFrame Table of supply curve points to connect to transmission features : gpd.GeoDataFrame Table of transmission features sub_lines_map : pandas.Series Series mapping substations to the transmission lines connected to each substation shape: Tuple[int, int] Shape of h5 costs raster """ with ExclusionLayers(cost_fpath) as f: crs = CRS.from_string(f.crs) transform = rasterio.Affine(*f.profile['transform']) shape = f.shape regions = f['ISO_regions'] features, sub_lines_map = cls._load_trans_feats(features_fpath) row, col, mask = cls._get_feature_cost_indices(features, crs, transform, shape) if any(~mask): msg = ("The following features are outside of the cost exclusion " "domain and will be dropped:\n{}" .format(features.loc[~mask, 'trans_gid'])) logger.warning(msg) warn(msg) row = row[mask] col = col[mask] features = features.loc[mask].reset_index(drop=True) features['row'] = row features['col'] = col features['region'] = regions[row, col] logger.debug('Converting SC points to GeoDataFrame') sc_points = cls._create_sc_points(cost_fpath, resolution=resolution) x, y = rasterio.transform.xy(transform, sc_points['row'].values, sc_points['col'].values) geo = [Point(xy) for xy in zip(x, y)] sc_points = gpd.GeoDataFrame(sc_points, crs=features.crs, geometry=geo) return sc_points, features, sub_lines_map, shape def _clip_to_sc_point(self, sc_point, tie_line_voltage, nn_sinks=2, clipping_buffer=1.05, radius=None, expand_radius=True): """ Clip costs raster to AOI around SC point, and get substations, load centers, and sinks within the clipped region. Parameters ---------- sc_point : gpd.GeoSeries SC point to clip raster around nn_sinks : int, optional Number of nearest neighbor sinks to clip to clipping_buffer : float, optional Buffer to increase clipping radius by, by default 1.05 radius : None | int, optional Force clipping radius. Transmission features beyond this radius will not be considered for connection with supply curve point. If ``None``, no radius is forced, and connections to all available transmission features are computed. By default, ``None``. expand_radius : bool, optional Option to expand radius to include at least one connection feature. Has no effect if ``radius=None``. By default, ``True``. Returns ------- radius : int Clipping radius in cost raster pixels x_feats : pd.DataFrame Substations, load centers, sinks, and nearest points on t-lines to SC point """ logger.debug('Clipping features to sc_point {}'.format(sc_point.name)) sc_features = self.features.copy(deep=True) if len(self.sink_coords) > 2 or radius: row, col = sc_point[['row', 'col']].values if radius is None: _, pos = self.sink_tree.query([row, col], k=nn_sinks) radius = np.abs(self.sink_coords[pos] - np.array([row, col]) ).max() radius = int(np.ceil(radius * clipping_buffer)) logger.debug('Radius to {} nearest sink is: {}' .format(nn_sinks, radius)) else: logger.debug('Using forced radius of {}'.format(radius)) sc_features = self._clip_to_radius(sc_point, radius, sc_features, clipping_buffer, expand_radius) mask = self.features['max_volts'] >= tie_line_voltage sc_features = sc_features.loc[mask].copy(deep=True) if sc_features.empty: return sc_features, None dists = (sc_features[['row', 'col']] - sc_point[['row', 'col']]) radius = int(np.ceil(dists.abs().values.max() * clipping_buffer)) logger.debug('{} transmission features found in clipped area with ' 'radius {} and minimum max voltage of {}' .format(len(sc_features), radius, tie_line_voltage)) # Find t-lines connected to substations within clip logger.debug('Collecting transmission lines connected to substations') mask = sc_features['category'] == SUBSTATION_CAT if mask.any(): trans_gids = sc_features.loc[mask, 'trans_gid'].values trans_gids = \ np.concatenate(self.sub_lines_mapping.loc[trans_gids].values) trans_gids = np.unique(trans_gids) line_mask = self.features['trans_gid'].isin(trans_gids) trans_lines = self.features.loc[line_mask].copy(deep=True) line_mask = trans_lines['trans_gid'].isin(sc_features['trans_gid']) trans_lines = trans_lines.loc[~line_mask] logger.debug('Adding all {} transmission lines connected to ' 'substations with minimum max voltage of {}' .format(len(trans_lines), tie_line_voltage)) sc_features = pd.concat([sc_features, trans_lines]) return sc_features, radius def _clip_to_radius(self, sc_point, radius, sc_features, clipping_buffer, expand_radius=True): """Clip features to radius. If no features are found within the initial radius, it is expanded (linearly by incrementally increasing the clipping buffer) until at least one connection feature is found. """ if radius is None or len(sc_features) == 0: return sc_features # Get pixel resolution and calculate buffer with ExclusionLayers(self._cost_fpath) as ds: resolution = ds.profile["transform"][0] radius_m = radius * resolution logger.debug('Clipping features to radius {}m'.format(radius_m)) buffer = sc_point["geometry"].buffer(radius_m) clipped_sc_features = sc_features.clip(buffer) clipping_buffer -= 0.05 while expand_radius and len(clipped_sc_features) <= 0: clipping_buffer += 0.05 radius_m = radius * resolution * clipping_buffer logger.debug('Clipping features to radius {}m'.format(radius_m)) buffer = sc_point["geometry"].buffer(radius_m) clipped_sc_features = sc_features.clip(buffer) logger.debug('{} transmission features found in clipped area with ' 'radius {}' .format(len(clipped_sc_features), radius_m)) return clipped_sc_features.copy(deep=True)
[docs] def process_sc_points(self, capacity_class, sc_point_gids=None, nn_sinks=2, clipping_buffer=1.05, barrier_mult=100, max_workers=None, save_paths=False, radius=None, expand_radius=True, mp_delay=3, simplify_geo=None): """ Compute Least Cost Transmission for desired sc_points Parameters ---------- capacity_class : str | int Capacity class of transmission features to connect supply curve points to sc_point_gids : list, optional List of sc_point_gids to connect to, by default connect to all nn_sinks : int, optional Number of nearest neighbor sinks to use for clipping radius calculation, by default 2 clipping_buffer : float, optional Buffer to expand clipping radius by, by default 1.05 barrier_mult : int, optional Transmission barrier multiplier, used when computing the least cost tie-line path, by default 100 max_workers : int, optional Number of workers to use for processing, if 1 run in serial, if None use all available cores, by default None save_paths : bool, optional Flag to return least cost paths as a multi-line geometry, by default False radius : None | int, optional Force clipping radius. Trasmission features beyond this radius will not be considered for connection with supply curve point. If ``None``, no radius is forced, and connections to all available transmission features are computed. By default, ``None``. expand_radius : bool, optional Option to expand radius to include at least one connection feature. Has no effect if ``radius=None``. By default, ``True``. mp_delay : float, optional Delay in seconds between starting multi-process workers. Useful for reducing memory spike at working startup. simplify_geo : float | None, optional If float, simplify geometries using this value Returns ------- least_costs : pandas.DataFrame | gpd.GeoDataFrame Least cost connections between all supply curve points and the transmission features with the given capacity class that are within "nn_sink" nearest infinite sinks """ max_workers = os.cpu_count() if max_workers is None else max_workers if sc_point_gids is None: sc_point_gids = self.sc_points['sc_point_gid'].values tie_line_voltage = self._config.capacity_to_kv(capacity_class) logger.debug('Using a barrier multiplier of %s', barrier_mult) if max_workers > 1: logger.info('Computing Least Cost Transmission for SC points in ' 'parallel on {} workers'.format(max_workers)) least_costs = self._process_multi_core( capacity_class, tie_line_voltage, sc_point_gids=sc_point_gids, nn_sinks=nn_sinks, clipping_buffer=clipping_buffer, barrier_mult=barrier_mult, save_paths=save_paths, radius=radius, expand_radius=expand_radius, mp_delay=mp_delay, simplify_geo=simplify_geo, max_workers=max_workers) else: logger.info('Computing Least Cost Transmission for {:,} SC points ' 'in serial'.format(len(sc_point_gids))) least_costs = self._process_single_core( capacity_class, tie_line_voltage, sc_point_gids=sc_point_gids, nn_sinks=nn_sinks, clipping_buffer=clipping_buffer, barrier_mult=barrier_mult, save_paths=save_paths, radius=radius, expand_radius=expand_radius, simplify_geo=simplify_geo) if not least_costs: return pd.DataFrame(columns=['sc_point_gid']) least_costs = pd.concat(least_costs).sort_values(['sc_point_gid', 'trans_gid']) capacity_class = self._config._parse_cap_class(capacity_class) least_costs['max_cap'] = self._config['power_classes'][capacity_class] lcp_frac = (len(least_costs['sc_point_gid'].unique()) / len(sc_point_gids) * 100) logger.info('{:.4f}% of requested sc point gids were successfully ' 'mapped to transmission features'.format(lcp_frac)) return least_costs.reset_index(drop=True)
def _process_multi_core(self, capacity_class, tie_line_voltage, sc_point_gids, nn_sinks=2, clipping_buffer=1.05, barrier_mult=100, max_workers=2, save_paths=False, radius=None, expand_radius=True, mp_delay=3, simplify_geo=None): """ Compute Least Cost Transmission for desired sc_points using multiple cores. Parameters ---------- capacity_class : str | int Capacity class of transmission features to connect supply curve points to tie_line_voltage : int Tie-line voltage (kV) sc_point_gids : list | set List of sc_point_gids to connect to, by default connect to all nn_sinks : int, optional Number of nearest neighbor sinks to use for clipping radius calculation, by default 2 clipping_buffer : float, optional Buffer to expand clipping radius by, by default 1.05 barrier_mult : int, optional Transmission barrier multiplier, used when computing the least cost tie-line path, by default 100 max_workers : int, optional Number of workers to use for processing save_paths : bool, optional Flag to return least cost paths as a multi-line geometry, by default False radius : None | int, optional Force clipping radius. Transmission features beyond this radius will not be considered for connection with supply curve point. If ``None``, no radius is forced, and connections to all available transmission features are computed. By default, ``None``. expand_radius : bool, optional Option to expand radius to include at least one connection feature. Has no effect if ``radius=None``. By default, ``True``. mp_delay : float, optional Delay in seconds between starting multi-process workers. Useful for reducing memory spike at working startup. simplify_geo : float | None, optional If float, simplify geometries using this value Returns ------- least_costs : pandas.DataFrame | gpd.GeoDataFrame Least cost connections between all supply curve points and the transmission features with the given capacity class that are within "nn_sink" nearest infinite sinks """ loggers = [__name__, 'reV', 'reVX'] with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: least_costs = self. _compute_paths_in_chunks( exe, max_workers, sc_point_gids, tie_line_voltage, nn_sinks, clipping_buffer, radius, expand_radius, mp_delay, capacity_class, barrier_mult, save_paths, simplify_geo) return least_costs def _compute_paths_in_chunks(self, exe, max_submissions, sc_point_gids, tie_line_voltage, nn_sinks, clipping_buffer, radius, expand_radius, mp_delay, capacity_class, barrier_mult, save_paths, simplify_geo): """Compute LCP's in parallel using futures. """ futures, paths = {}, [] num_jobs = 1 for __, sc_point in self.sc_points.iterrows(): gid = sc_point['sc_point_gid'] if gid not in sc_point_gids: continue sc_features, sc_radius = self._clip_to_sc_point( sc_point, tie_line_voltage, nn_sinks=nn_sinks, clipping_buffer=clipping_buffer, radius=radius, expand_radius=expand_radius) if sc_features.empty: continue start_cost = _starting_cost(self._cost_fpath, sc_point.copy(deep=True), sc_features, capacity_class, radius=sc_radius, xmission_config=self._config, barrier_mult=barrier_mult) if start_cost < 0: logger.debug("Could not connect SC point {} to " "transmission features: Invalid start cost!" .format(gid)) continue future = exe.submit(TransCapCosts.run, self._cost_fpath, sc_point.copy(deep=True), sc_features, capacity_class, radius=sc_radius, xmission_config=self._config, barrier_mult=barrier_mult, min_line_length=self._min_line_len, save_paths=save_paths, simplify_geo=simplify_geo) futures[future] = None num_jobs += 1 if num_jobs <= max_submissions: time.sleep(mp_delay) logger.debug('Submitted {} futures'.format(num_jobs)) log_mem(logger) if num_jobs % max_submissions == 0: paths = _collect_future_chunks(futures, paths) paths = _collect_future_chunks(futures, paths) return paths def _process_single_core(self, capacity_class, tie_line_voltage, sc_point_gids, nn_sinks=2, clipping_buffer=1.05, barrier_mult=100, save_paths=False, radius=None, expand_radius=True, simplify_geo=None): """ Compute Least Cost Transmission for desired sc_points with a single core. Parameters ---------- capacity_class : str | int Capacity class of transmission features to connect supply curve points to tie_line_voltage : int Tie-line voltage (kV) sc_point_gids : list | set List of sc_point_gids to connect to, by default connect to all nn_sinks : int, optional Number of nearest neighbor sinks to use for clipping radius calculation, by default 2 clipping_buffer : float, optional Buffer to expand clipping radius by, by default 1.05 barrier_mult : int, optional Transmission barrier multiplier, used when computing the least cost tie-line path, by default 100 save_paths : bool, optional Flag to return least cost paths as a multi-line geometry, by default False radius : None | int, optional Force clipping radius. Transmission feature beyond this radius will not be considered for connection with supply curve point. If ``None``, no radius is forced, and connections to all available transmission features are computed. By default, ``None``. expand_radius : bool, optional Option to expand radius to include at least one connection feature. Has no effect if ``radius=None``. By default, ``True``. simplify_geo : float | None, optional If float, simplify geometries using this value Returns ------- least_costs : pandas.DataFrame | gpd.GeoDataFrame Least cost connections between all supply curve points and the transmission features with the given capacity class that are within "nn_sink" nearest infinite sinks """ least_costs = [] count = 0 for _, sc_point in self.sc_points.iterrows(): gid = sc_point['sc_point_gid'] if gid in sc_point_gids: sc_features, sc_radius = self._clip_to_sc_point( sc_point, tie_line_voltage, nn_sinks=nn_sinks, clipping_buffer=clipping_buffer, radius=radius, expand_radius=expand_radius) if sc_features.empty: continue sc_costs = TransCapCosts.run( self._cost_fpath, sc_point.copy(deep=True), sc_features, capacity_class, radius=sc_radius, xmission_config=self._config, barrier_mult=barrier_mult, min_line_length=self._min_line_len, save_paths=save_paths, simplify_geo=simplify_geo) if sc_costs is not None: least_costs.append(sc_costs) count += 1 logger.info('SC point {} of {} complete!' .format(count, len(sc_point_gids))) log_mem(logger) return least_costs
[docs] @classmethod def run(cls, cost_fpath, features_fpath, capacity_class, resolution=128, xmission_config=None, min_line_length=0, sc_point_gids=None, nn_sinks=2, clipping_buffer=1.05, barrier_mult=100, max_workers=None, save_paths=False, radius=None, expand_radius=True, simplify_geo=None): """ Find Least Cost Transmission connections between desired sc_points to given transmission features for desired capacity class Parameters ---------- cost_fpath : str Path to h5 file with cost rasters and other required layers features_fpath : str Path to GeoPackage with transmission features capacity_class : str | int Capacity class of transmission features to connect supply curve points to resolution : int, optional SC point resolution, by default 128 xmission_config : str | dict | XmissionConfig, optional Path to Xmission config .json, dictionary of Xmission config .jsons, or preloaded XmissionConfig objects, by default None min_line_length : int | float, optional Minimum line length in km, by default 0. sc_point_gids : list, optional List of sc_point_gids to connect to, by default None nn_sinks : int, optional Number of nearest neighbor sinks to use for clipping radius calculation, by default 2 clipping_buffer : float, optional Buffer to expand clipping radius by, by default 1.05 barrier_mult : int, optional Transmission barrier multiplier, used when computing the least cost tie-line path, by default 100 max_workers : int, optional Number of workers to use for processing, if 1 run in serial, if None use all available cores, by default None save_paths : bool, optional Flag to return least costs path as a multi-line geometry, by default False radius : None | int, optional Force clipping radius. Transmission features beyond this radius wil not be considered for connection with supply curve point. If ``None``, no radius is forced, and connections to all available transmission features are computed. By default, ``None``. expand_radius : bool, optional Option to expand radius to include at least one connection feature. Has no effect if ``radius=None``. By default, ``True``. simplify_geo : float | None, optional If float, simplify geometries using this value Returns ------- least_costs : pandas.DataFrame | gpd.DataFrame Least cost connections between all supply curve points and the transmission features with the given capacity class that are within "nn_sink" nearest infinite sinks """ ts = time.time() lcx = cls(cost_fpath, features_fpath, resolution=resolution, xmission_config=xmission_config, min_line_length=min_line_length) least_costs = lcx.process_sc_points(capacity_class, sc_point_gids=sc_point_gids, nn_sinks=nn_sinks, clipping_buffer=clipping_buffer, barrier_mult=barrier_mult, max_workers=max_workers, save_paths=save_paths, radius=radius, expand_radius=expand_radius, simplify_geo=simplify_geo) logger.info('{} connections were made to {} SC points in {:.4f} ' 'minutes' .format(len(least_costs), len(least_costs['sc_point_gid'].unique()), (time.time() - ts) / 60)) return least_costs
[docs]class ReinforcedXmission(LeastCostXmission): """ Compute Least Cost tie-line paths and full transmission cap cost for all supply curve points to all possible connections (substations within the SC reinforcement region). """ def __init__(self, cost_fpath, features_fpath, regions_fpath, region_identifier_column, resolution=128, xmission_config=None, min_line_length=0, allow_connections_within_states=False): """ Parameters ---------- cost_fpath : str Path to h5 file with cost rasters and other required layers. features_fpath : str Path to GeoPackage with transmission features. All features except substations will be dropped. This table must have a `region_identifier` column which matches one of the `region_identifier` ID's in the reinforcement regions GeoPackage. regions_fpath : str Path to GeoPackage with reinforcement regions. This table must have the specified `region_identifier` column which matches the `region_identifier` column ID's in the `features_fpath` GeoPackage. region_identifier_column : str Name of column in reinforcement regions GeoPackage containing a unique identifier for each region. resolution : int, optional SC point resolution. By default, ``128``. xmission_config : str | dict | XmissionConfig, optional Path to Xmission config .json, dictionary of Xmission config .jsons, or preloaded XmissionConfig objects. By default, ``None``. min_line_length : int | float, optional Minimum line length in km. By default, ``0``. """ super().__init__(cost_fpath=cost_fpath, features_fpath=features_fpath, resolution=resolution, xmission_config=xmission_config, min_line_length=min_line_length) self._regions = (gpd.read_file(regions_fpath) .to_crs(self.features.crs)) self._rid_column = region_identifier_column self.allow_connections_within_states = allow_connections_within_states @staticmethod def _load_trans_feats(features_fpath): """Load existing substations from disk. """ logger.debug('Loading substations...') substations = gpd.read_file(features_fpath) substations = substations[substations.category == SUBSTATION_CAT] substations = substations.reset_index(drop=True) substations = substations.drop(columns=['bgid', 'egid', 'cap_left'], errors='ignore') mapping = {'gid': 'trans_gid', 'trans_gids': 'trans_line_gids'} substations = substations.rename(columns=mapping) return substations, None def _clip_to_sc_point(self, sc_point, tie_line_voltage, nn_sinks=2, clipping_buffer=1.05, radius=None, expand_radius=True): """Clip features to be substations in the region of the sc point. """ logger.debug('Clipping features to sc_point {}'.format(sc_point.name)) point = self.sc_points.loc[sc_point.name:sc_point.name].centroid map_func = reinforcement_region_mapper(self._regions, self._rid_column) rid = point.apply(map_func).values[0] logger.debug(" - Clipping features to reinforcement region: {}" .format(rid)) mask = self.features[self._rid_column] == rid sc_features = self.features.loc[mask].copy(deep=True) logger.debug('{} transmission features found in clipped area ' .format(len(sc_features))) if radius is not None: sc_features = self._clip_to_radius(sc_point, radius, sc_features, clipping_buffer, expand_radius) mask = self.features['max_volts'] >= tie_line_voltage sc_features = sc_features.loc[mask].copy(deep=True) if sc_features.empty: return sc_features, None dists = (sc_features[['row', 'col']] - sc_point[['row', 'col']]) radius = int(np.ceil(dists.abs().values.max() * clipping_buffer)) logger.debug('{} transmission features found in clipped area of ' 'radius {} with minimum max voltage of {}' .format(len(sc_features), radius, tie_line_voltage)) return sc_features, radius
[docs] @classmethod def run(cls, cost_fpath, features_fpath, regions_fpath, region_identifier_column, capacity_class, resolution=128, xmission_config=None, min_line_length=0, sc_point_gids=None, clipping_buffer=1.05, barrier_mult=100, max_workers=None, simplify_geo=None, save_paths=False, radius=None, expand_radius=True): """ Find Least Cost Transmission connections between desired sc_points and substations in their reinforcement region. Parameters ---------- cost_fpath : str Path to h5 file with cost rasters and other required layers features_fpath : str Path to GeoPackage with transmission features. All features except substations will be dropped. This table must have a `region_identifier` column which matches one of the `region_identifier` ID's in the reinforcement regions GeoPackage. regions_fpath : str Path to GeoPackage with reinforcement regions. This table must have the specified `region_identifier` column which matches the `region_identifier` column ID's in the `features_fpath` GeoPackage. region_identifier_column : str Name of column in reinforcement regions GeoPackage containing a unique identifier for each region. capacity_class : str | int Capacity class of transmission features to connect supply curve points to. resolution : int, optional SC point resolution. By default, ``128``. xmission_config : str | dict | XmissionConfig, optional Path to Xmission config .json, dictionary of Xmission config .jsons, or preloaded XmissionConfig objects. By default, ``None``. min_line_length : int | float, optional Minimum line length in km. By default, ``0``. sc_point_gids : list, optional List of sc_point_gids to connect to. By default, ``None``, which processes all points. clipping_buffer : float, optional Buffer to expand clipping radius by. By default, ``1.05``. barrier_mult : int, optional Multiplier on transmission barrier costs. By default, ``100``. max_workers : int, optional Number of workers to use for processing. If 1 run in serial, if ``None`` use all available cores. By default, ``None``. simplify_geo : float | None, optional If float, simplify geometries using this value. save_paths : bool, optional Flag to save reinforcement line path as a multi-line geometry. By default, ``False``. radius : None | int, optional Force clipping radius. Substations beyond this radius will not be considered for connection with supply curve point. If ``None``, no radius is forced, and connections to all available transmission features are computed. By default, ``None``. expand_radius : bool, optional Option to expand radius to include at least one connection feature. Has no effect if ``radius=None``. By default, ``True``. Returns ------- least_costs : pandas.DataFrame | gpd.DataFrame Least cost connections between all supply curve points and the substations in their reinforcement region with the given capacity class. """ ts = time.time() lcx = cls(cost_fpath, features_fpath, regions_fpath, region_identifier_column, resolution, xmission_config, min_line_length) least_costs = lcx.process_sc_points(capacity_class, sc_point_gids=sc_point_gids, clipping_buffer=clipping_buffer, barrier_mult=barrier_mult, max_workers=max_workers, save_paths=save_paths, radius=radius, expand_radius=expand_radius, simplify_geo=simplify_geo) logger.info('{} connections were made to {} SC points in {:.4f} ' 'minutes' .format(len(least_costs), len(least_costs['sc_point_gid'].unique()), (time.time() - ts) / 60)) return least_costs
[docs]def reinforcement_region_mapper(regions, region_identifier_column): """Generate a function to map points to a reinforcement region. The returned mapping function maps a point to a unique value from the `region_identifier_column` column in the input GeoPackage. Parameters ---------- regions : gpd.GeoPackage GeoPackage defining the reinforcement regions. This table must have a `region_identifier_column` column which uniquely identifies the region, as well as a geometry for each region. """ def _map_reinforcement_region(point): """Find the reinforcement region ID for the input point. """ idx = regions.distance(point).sort_values().index[0] return regions.loc[idx, region_identifier_column] return _map_reinforcement_region
def _collect_future_chunks(futures, least_cost_paths): """Collect all futures from the input dictionary. """ num_to_collect = len(futures) for i, future in enumerate(as_completed(futures), start=1): futures.pop(future) sc_costs = future.result() if sc_costs is not None: least_cost_paths.append(sc_costs) logger.debug('Collected {} of {} futures!'.format(i, num_to_collect)) log_mem(logger) return least_cost_paths def _starting_cost(cost_fpath, point, sc_features, capacity_class, **kwargs): """Extract the starting point cost""" cost_calculator = TransCapCosts(cost_fpath, point, sc_features, capacity_class, **kwargs) return cost_calculator.mcp_cost[cost_calculator.row, cost_calculator.col]