# -*- coding: utf-8 -*-
# pylint: disable=all
"""
Least Cost Xmission Command Line Interface
"""
from warnings import warn
import click
import logging
import os
import json
from pathlib import Path
import numpy as np
import geopandas as gpd
from rex.utilities.loggers import init_logger, init_mult, create_dirs
from rex.utilities.cli_dtypes import STR, INT
from rex.utilities.hpc import SLURM
from rex.utilities.utilities import get_class_properties
from reVX.config.least_cost_xmission import LeastCostPathsConfig
from reVX.least_cost_xmission.config import XmissionConfig
from reVX.least_cost_xmission.least_cost_paths import (LeastCostPaths,
ReinforcementPaths)
from reVX.least_cost_xmission.least_cost_xmission import (
reinforcement_region_mapper
)
from reVX.least_cost_xmission.config import TRANS_LINE_CAT, SUBSTATION_CAT
from reVX import __version__
logger = logging.getLogger(__name__)
@click.group()
@click.version_option(version=__version__)
@click.option('--name', '-n', default='LeastCostPaths', type=STR,
show_default=True,
help='Job name.')
@click.option('--verbose', '-v', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def main(ctx, name, verbose):
"""
Least Cost Paths Command Line Interface
"""
ctx.ensure_object(dict)
ctx.obj['VERBOSE'] = verbose
ctx.obj['NAME'] = name
@main.command()
def valid_config_keys():
"""
Echo the valid Least Cost Paths config keys
"""
click.echo(', '.join(get_class_properties(LeastCostPathsConfig)))
[docs]def run_local(ctx, config):
"""
Run Least Cost Paths locally using config
Parameters
----------
ctx : click.ctx
click ctx object
config : reVX.config.least_cost_xmission.LeastCostPathsConfig
Least Cost Paths config object.
"""
ctx.obj['NAME'] = config.name
ctx.invoke(local,
cost_fpath=config.cost_fpath,
features_fpath=config.features_fpath,
network_nodes_fpath=config.network_nodes_fpath,
transmission_lines_fpath=config.transmission_lines_fpath,
capacity_class=config.capacity_class,
xmission_config=config.xmission_config,
clip_buffer=config.clip_buffer,
start_index=0, step_index=1,
barrier_mult=config.barrier_mult,
max_workers=config.execution_control.max_workers,
region_identifier_column=config.region_identifier_column,
save_paths=config.save_paths,
out_dir=config.dirout,
log_dir=config.log_directory,
verbose=config.log_level)
@main.command()
@click.option('--config', '-c', required=True,
type=click.Path(exists=True),
help='Filepath to AssemblyAreas config json file.')
@click.option('--verbose', '-v', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config, verbose):
"""
Run Least Cost Paths from a config.
"""
config = LeastCostPathsConfig(config)
if 'VERBOSE' in ctx.obj:
if any((ctx.obj['VERBOSE'], verbose)):
config._log_level = logging.DEBUG
elif verbose:
config._log_level = logging.DEBUG
if config.execution_control.option == 'local':
run_local(ctx, config)
return
if config.execution_control.option not in {'eagle', 'kestrel'}:
click.echo('Option "{}" is not supported'
.format(config.execution_control.option))
return
# No need to add index to file name
if config.execution_control.nodes == 1:
eagle(config)
return
name = config.name
num_nodes = config.execution_control.nodes
logger.info('Splitting features over {} SLURM jobs'.format(num_nodes))
n_zfill = len(str(num_nodes))
for i in range(num_nodes):
config.name = '{}_{}'.format(name, str(i).zfill(n_zfill))
eagle(config, start_index=i)
@main.command()
@click.option('--cost_fpath', '-cost', type=click.Path(exists=True),
required=True,
help=("Path to h5 file with cost rasters and other required "
"layers"))
@click.option('--features_fpath', '-feats', required=True,
type=click.Path(exists=True),
help="Path to GeoPackage with transmission features")
@click.option('--capacity_class', '-cap', type=str, required=True,
help=("Capacity class of transmission features to connect "
"supply curve points to"))
@click.option('--network_nodes_fpath', '-nn', type=STR, show_default=True,
default=None,
help=("Path to Network Nodes GeoPackage. If given alongside "
"`transmission_lines_fpath`, reinforcement path cost "
"calculation is run."))
@click.option('--transmission_lines_fpath', '-tl', type=STR, show_default=True,
default=None,
help=("Path to Transmission lines GeoPackage. This file can "
"contain other features, but transmission lines must "
"be identified by {!r}. If given alongside "
"`network_nodes_fpath`, reinforcement path cost "
"calculation is run.".format(TRANS_LINE_CAT)))
@click.option('--xmission_config', '-xcfg', type=STR, show_default=True,
default=None,
help=("Path to transmission config .json"))
@click.option('--clip_buffer', '-cb', type=int,
show_default=True, default=0,
help="Optional number of array elements to buffer clip area by.")
@click.option('--start_index', '-start', type=int,
show_default=True, default=0,
help=("Start index of features to run."))
@click.option('--step_index', '-step', type=int,
show_default=True, default=1,
help=("Step index of features to run."))
@click.option('--barrier_mult', '-bmult', type=float,
show_default=True, default=100,
help=("Transmission barrier multiplier, used when computing the "
"least cost tie-line path"))
@click.option('--max_workers', '-mw', type=INT,
show_default=True, default=None,
help=("Number of workers to use for processing, if 1 run in "
"serial, if None use all available cores"))
@click.option('--region_identifier_column', '-rid', type=STR, default=None,
help=("Name of column in reinforcement regions GeoPackage"
"containing a unique identifier for each region."))
@click.option('--save_paths', '-paths', is_flag=True,
help="Flag to save least cost path as a multi-line geometry")
@click.option('--out_dir', '-o', type=STR, default='./',
show_default=True,
help='Directory to save least cost Paths values to.')
@click.option('--log_dir', '-log', default=None, type=STR,
show_default=True,
help='Directory to dump log files.')
@click.option('--verbose', '-v', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def local(ctx, cost_fpath, features_fpath, capacity_class, network_nodes_fpath,
transmission_lines_fpath, xmission_config, clip_buffer, start_index,
step_index, barrier_mult, max_workers, region_identifier_column,
save_paths, out_dir, log_dir, verbose):
"""
Run Least Cost Paths on local hardware
"""
name = ctx.obj['NAME']
if 'VERBOSE' in ctx.obj:
verbose = any((ctx.obj['VERBOSE'], verbose))
log_modules = [__name__, 'reVX', 'reV', 'rex']
init_mult(name, log_dir, modules=log_modules, verbose=verbose)
create_dirs(out_dir)
logger.info('Computing Least Cost Paths connections and writing them to {}'
.format(out_dir))
xmission_config = XmissionConfig(config=xmission_config)
logger.debug('Xmission Config: {}'.format(xmission_config))
is_reinforcement_run = (network_nodes_fpath is not None
and transmission_lines_fpath is not None)
if is_reinforcement_run:
features = gpd.read_file(network_nodes_fpath)
features, *__ = LeastCostPaths._map_to_costs(cost_fpath, features)
indices = features.index[start_index::step_index]
kwargs = {"xmission_config": xmission_config,
"clip_buffer": int(clip_buffer),
"barrier_mult": barrier_mult,
"indices": indices,
"save_paths": save_paths}
least_costs = ReinforcementPaths.run(cost_fpath, features_fpath,
network_nodes_fpath,
region_identifier_column,
transmission_lines_fpath,
capacity_class,
**kwargs)
else:
features = gpd.read_file(features_fpath)
features, *__ = LeastCostPaths._map_to_costs(cost_fpath, features)
indices = features.index[start_index::step_index]
least_costs = LeastCostPaths.run(cost_fpath, features_fpath,
capacity_class,
clip_buffer=int(clip_buffer),
barrier_mult=barrier_mult,
indices=indices,
max_workers=max_workers,
save_paths=save_paths)
capacity_class = xmission_config._parse_cap_class(capacity_class)
cap = xmission_config['power_classes'][capacity_class]
kv = xmission_config.capacity_to_kv(capacity_class)
fn_out = '{}_{}MW_{}kV'.format(name, cap, kv)
fpath_out = os.path.join(out_dir, fn_out)
if save_paths:
fpath_out += '.gpkg'
least_costs.to_file(fpath_out, driver="GPKG", index=False)
else:
fpath_out += '.csv'
least_costs.to_csv(fpath_out, index=False)
@main.command()
@click.option('--features_fpath', '-feats', required=True,
type=click.Path(exists=True),
help="Path to GeoPackage with substation and transmission "
"features")
@click.option('--regions_fpath', '-regs', required=True,
type=click.Path(exists=True),
help=("Path to reinforcement regions GeoPackage."))
@click.option('--region_identifier_column', '-rid', required=True,
type=STR,
help=("Name of column in reinforcement regions GeoPackage"
"containing a unique identifier for each region."))
@click.option('--network_nodes_fpath', '-nodes', default=None, type=STR,
help=("Path to network nodes GeoPackage. If this input is "
"included, the `region_identifier_column` is added if "
"it is missing."))
@click.option('--out_file', '-of', default=None, type=STR,
help='Name for output GeoPackage file.')
@click.pass_context
def map_ss_to_rr(ctx, features_fpath, regions_fpath, region_identifier_column,
network_nodes_fpath, out_file):
"""
Map substation locations to reinforcement regions.
Reinforcement regions are user-defined. Typical regions are
Balancing Areas, States, or Counties, though custom regions are also
allowed. Each region must be supplied with a unique identifier in
the input file.
This method also removes substations that do not meet the min 69 kV
voltage requirement and adds {'min_volts', 'max_volts'} fields to
the remaining substations.
.. Important:: This method DOES NOT clip the substations to the
reinforcement regions boundary. All substations will be mapped to
their closest region. It is your responsibility to remove any
substations outside of the analysis region before calling this
method.
Doing the pre-processing step avoids any issues with substations
being left out or double counted if they were simply clipped to the
reinforcement region shapes.
"""
log_level = "DEBUG" if ctx.obj.get('VERBOSE') else "INFO"
init_logger('reVX', log_level=log_level)
features = gpd.read_file(features_fpath)
regions = gpd.read_file(regions_fpath).to_crs(features.crs)
substations = (features[features.category == SUBSTATION_CAT]
.reset_index(drop=True).dropna(axis="columns", how="all"))
logger.info("Mapping {:,d} substation locations to {:,d} reinforcement "
"regions".format(substations.shape[0], regions.shape[0]))
map_func = reinforcement_region_mapper(regions, region_identifier_column)
centroids = substations.centroid
substations[region_identifier_column] = centroids.apply(map_func)
logger.info("Calculating min/max voltage for each substation...")
bad_subs = np.zeros(len(substations), dtype=bool)
for idx, row in substations.iterrows():
lines = row['trans_gids']
if isinstance(lines, str):
lines = json.loads(lines)
lines_mask = features['gid'].isin(lines)
voltage = features.loc[lines_mask, 'voltage'].values
if np.max(voltage) >= 69:
substations.loc[idx, 'min_volts'] = np.min(voltage)
substations.loc[idx, 'max_volts'] = np.max(voltage)
else:
bad_subs[idx] = True
if any(bad_subs):
msg = ("The following sub-stations do not have the minimum "
"required voltage of 69 kV and will be dropped:\n{}"
.format(substations.loc[bad_subs, 'gid']))
logger.warning(msg)
warn(msg)
substations = substations.loc[~bad_subs].reset_index(drop=True)
logger.info("Writing substation output to {!r}".format(out_file))
substations.to_file(out_file, driver="GPKG", index=False)
if network_nodes_fpath is None:
return
network_nodes_fpath = Path(network_nodes_fpath)
network_nodes = gpd.read_file(network_nodes_fpath).to_crs(features.crs)
if region_identifier_column in network_nodes:
msg = ("Network nodes file {!r} was specified but it "
"already contains the {!r} column. No data modified!"
.format(str(network_nodes_fpath), region_identifier_column))
logger.warning(msg)
warn(msg)
return
centroids = network_nodes.centroid
network_nodes[region_identifier_column] = centroids.apply(map_func)
out_fn = "{}.gpkg".format(network_nodes_fpath.stem)
out_fp = network_nodes_fpath.parent / out_fn
logger.info("Writing updated network node data to {!r}"
.format(str(out_fp)))
network_nodes.to_file(out_fp, driver="GPKG")
[docs]def get_node_cmd(config, start_index=0):
"""
Get the node CLI call for Least Cost Paths
Parameters
----------
config : reVX.config.least_cost_xmission.LeastCostPathsConfig
Least Cost Paths config object.
Returns
-------
cmd : str
CLI call to submit to SLURM execution.
"""
args = ['-n {}'.format(SLURM.s(config.name)),
'local',
'-cost {}'.format(SLURM.s(config.cost_fpath)),
'-feats {}'.format(SLURM.s(config.features_fpath)),
'-nn {}'.format(SLURM.s(config.network_nodes_fpath)),
'-tl {}'.format(SLURM.s(config.transmission_lines_fpath)),
'-rid {}'.format(SLURM.s(config.region_identifier_column)),
'-cap {}'.format(SLURM.s(config.capacity_class)),
'-cb {}'.format(SLURM.s(config.clip_buffer)),
'-start {}'.format(SLURM.s(start_index)),
'-step {}'.format(SLURM.s(config.execution_control.nodes or 1)),
'-bmult {}'.format(SLURM.s(config.barrier_mult)),
'-mw {}'.format(SLURM.s(config.execution_control.max_workers)),
'-o {}'.format(SLURM.s(config.dirout)),
'-log {}'.format(SLURM.s(config.log_directory)),
]
if config.save_paths:
args.append('-paths')
if config.log_level == logging.DEBUG:
args.append('-v')
cmd = ('python -m reVX.least_cost_xmission.least_cost_paths_cli {}'
.format(' '.join(args)))
logger.debug('Submitting the following cli call:\n\t{}'.format(cmd))
return cmd
[docs]def eagle(config, start_index=0):
"""
Run Least Cost Paths on Eagle HPC.
Parameters
----------
config : reVX.config.least_cost_xmission.LeastCostPathsConfig
Least Cost Paths config object.
"""
cmd = get_node_cmd(config, start_index)
name = config.name
log_dir = config.log_directory
stdout_path = os.path.join(log_dir, 'stdout/')
slurm_manager = SLURM()
logger.info('Running Least Cost Paths on Eagle with '
'node name "{}"'.format(name))
out = slurm_manager.sbatch(cmd,
alloc=config.execution_control.allocation,
memory=config.execution_control.memory,
walltime=config.execution_control.walltime,
feature=config.execution_control.feature,
name=name, stdout_path=stdout_path,
conda_env=config.execution_control.conda_env,
module=config.execution_control.module)[0]
if out:
msg = ('Kicked off Least Cost Paths "{}" '
'(SLURM jobid #{}) on Eagle.'
.format(name, out))
else:
msg = ('Was unable to kick off Least Cost Paths '
'"{}". Please see the stdout error messages'
.format(name))
click.echo(msg)
logger.info(msg)
if __name__ == '__main__':
try:
main(obj={})
except Exception:
logger.exception('Error running Least Cost Paths CLI')
raise