# -*- coding: utf-8 -*-
# pylint: disable=all
"""
Least Cost Xmission Command Line Interface
TODO - add cmd line doc
"""
import os
import sys
import click
import logging
import warnings
import pandas as pd
import geopandas as gpd
from typing import List
from rex.utilities.loggers import init_mult, create_dirs, init_logger
from rex.utilities.cli_dtypes import STR, INTLIST, INT, FLOAT
from rex.utilities.hpc import SLURM
from rex.utilities.utilities import get_class_properties
from reVX import __version__
from reVX.config.least_cost_xmission import LeastCostXmissionConfig
from reVX.least_cost_xmission.least_cost_xmission import (LeastCostXmission,
ReinforcedXmission)
from reVX.least_cost_xmission.config import (TRANS_LINE_CAT, LOAD_CENTER_CAT,
SINK_CAT, SUBSTATION_CAT)
from reVX.least_cost_xmission.least_cost_paths import min_reinforcement_costs
TRANS_CAT_TYPES = [TRANS_LINE_CAT, LOAD_CENTER_CAT, SINK_CAT, SUBSTATION_CAT]
logger = logging.getLogger(__name__)
@click.group()
@click.version_option(version=__version__)
@click.option('--name', '-n', default='LeastCostXmission', type=STR,
show_default=True,
help='Job name.')
@click.option('--verbose', '-v', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def main(ctx, name, verbose):
"""
Least Cost Xmission Command Line Interface
"""
ctx.ensure_object(dict)
ctx.obj['VERBOSE'] = verbose
ctx.obj['NAME'] = name
@main.command()
def valid_config_keys():
"""
Echo the valid Least Cost Xmission config keys
"""
click.echo(', '.join(get_class_properties(LeastCostXmissionConfig)))
@main.command()
@click.option('--config', '-c', required=True,
type=click.Path(exists=True),
help='Filepath to Least Cost Xmission config json file.')
@click.option('--verbose', '-v', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config, verbose):
"""
Run Least Cost Xmission from a config.
"""
config = LeastCostXmissionConfig(config)
option = config.execution_control.option
if 'VERBOSE' in ctx.obj:
if any((ctx.obj['VERBOSE'], verbose)):
config._log_level = logging.DEBUG
elif verbose:
config._log_level = logging.DEBUG
if option == 'local':
run_local(ctx, config)
return
if option not in {'eagle', 'kestrel'}:
click.echo('Option "{}" is not supported'.format(option))
return
if config.execution_control.nodes == 1:
eagle(config, config.sc_point_gids)
return
# Split gids over mulitple SLURM jobs
name = config.name
logger.info('Splitting SC points over {} SLURM jobs'
.format(config.execution_control.nodes))
for i in range(config.execution_control.nodes):
config.name = '{}_{}'.format(name, i)
eagle(config, config.sc_point_gids[i::config.execution_control.nodes])
@main.command()
@click.option('--cost_fpath', '-cost', type=click.Path(exists=True),
required=True,
help=("Path to h5 file with cost rasters and other required "
"layers"))
@click.option('--features_fpath', '-feats', required=True,
type=click.Path(exists=True),
help="Path to GeoPackage with transmission features")
@click.option('--regions_fpath', '-regs', type=STR, show_default=True,
default=None,
help=("Path to reinforcement regions GeoPackage. If not `None`, "
"Least Cost Xmission is run with reinforcement path "
"costs. Features must be substations only, and the "
"substation file must contain a "
"`region_identifier_column` column that matches the "
"`region_identifier_column` ID in this file for the "
"reinforcement region containing that substation. "))
@click.option('--region_identifier_column', '-rid', type=STR, default=None,
help=("Name of column in reinforcement regions GeoPackage"
"containing a unique identifier for each region."))
@click.option('--capacity_class', '-cap', type=str, required=True,
help=("Capacity class of transmission features to connect "
"supply curve points to"))
@click.option('--resolution', '-res', type=int,
show_default=True, default=128,
help=("SC point resolution"))
@click.option('--xmission_config', '-xcfg', type=STR, show_default=True,
default=None,
help=("Path to Xmission config .json"))
@click.option('--min_line_length', '-mll', type=int,
show_default=True, default=0,
help=("Minimum Tie-line length."))
@click.option('--sc_point_gids', '-gids', type=INTLIST, show_default=True,
default=None, help=("List of sc_point_gids to connect to. If "
"running `from_config`, this can also be a "
"path to a CSV file with a 'sc_point_gid' "
"column containing the GID's to run. Note "
"the missing 's' in the column name - this "
"makes it seamless to run on a supply curve "
"output from reV"))
@click.option('--nn_sinks', '-nn', type=int,
show_default=True, default=2,
help=("Number of nearest neighbor sinks to use for clipping "
"radius calculation. This is overridden by --radius"))
@click.option('--clipping_buffer', '-buffer', type=float,
show_default=True, default=1.05,
help=("Buffer to expand clipping radius by"))
@click.option('--barrier_mult', '-bmult', type=float,
show_default=True, default=100,
help=("Tranmission barrier multiplier, used when computing the "
"least cost tie-line path"))
@click.option('--max_workers', '-mw', type=INT,
show_default=True, default=None,
help=("Number of workers to use for processing, if 1 run in "
"serial, if None use all available cores"))
@click.option('--out_dir', '-o', type=STR, default='./out',
show_default=True,
help='Directory to save least cost xmission values to.')
@click.option('--log_dir', '-log', default=None, type=STR,
show_default=True,
help='Directory to dump log files.')
@click.option('--verbose', '-v', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.option('--save_paths', is_flag=True,
help='Save least cost paths and data to GeoPackage.')
@click.option('--radius', '-rad', type=INT,
show_default=True, default=None,
help=("Radius to clip costs raster to in pixels This overrides "
"--nn_sinks if set."))
@click.option('--expand_radius', '-er', is_flag=True,
help='Flag to expand radius until at least one transmission '
'feature is included for connection. Has no effect if '
'radius input is ``None``.')
@click.option('--simplify-geo', type=FLOAT,
show_default=True, default=None,
help=("Simplify path geometries by a value before writing to "
"GeoPackage."))
@click.option('--cost-layers', '-cl', required=True, multiple=True,
default=(),
help='Layer in H5 to add to total cost raster used for routing. '
'Multiple layers may be specified. Layer name may have '
'curly brackets (``{}``), which will be filled in '
'based on the capacity class input (e.g. '
'"tie_line_costs_{}MW")')
@click.option('--li-cost-layers', '-licl', required=False, multiple=True,
default=(),
help='Length-invariant cost layer in H5 to add to total cost '
'raster used for routing. These costs do not scale with '
'distance traversed acroiss the cell. Multiple layers may '
'be specified.')
@click.pass_context
def local(ctx, cost_fpath, features_fpath, regions_fpath,
region_identifier_column, capacity_class, resolution,
xmission_config, min_line_length, sc_point_gids, nn_sinks,
clipping_buffer, barrier_mult, max_workers, out_dir, log_dir,
verbose, save_paths, radius, expand_radius, simplify_geo,
cost_layers: List[str], li_cost_layers):
"""
Run Least Cost Xmission on local hardware
"""
name = ctx.obj['NAME']
if 'VERBOSE' in ctx.obj:
verbose = any((ctx.obj['VERBOSE'], verbose))
log_modules = [__name__, 'reVX', 'reV', 'rex']
init_mult(name, log_dir, modules=log_modules, verbose=verbose)
create_dirs(out_dir)
logger.info('Computing Least Cost Xmission connections and writing them {}'
.format(out_dir))
kwargs = {"resolution": resolution,
"xmission_config": xmission_config,
"min_line_length": min_line_length,
"sc_point_gids": sc_point_gids,
"clipping_buffer": clipping_buffer,
"barrier_mult": barrier_mult,
"max_workers": max_workers,
"save_paths": save_paths,
"simplify_geo": simplify_geo,
"radius": radius,
"expand_radius": expand_radius,
"length_invariant_cost_layers": li_cost_layers}
if regions_fpath is not None:
least_costs = ReinforcedXmission.run(cost_fpath, features_fpath,
regions_fpath,
region_identifier_column,
capacity_class, cost_layers,
**kwargs)
else:
kwargs["nn_sinks"] = nn_sinks
least_costs = LeastCostXmission.run(cost_fpath, features_fpath,
capacity_class, cost_layers,
**kwargs)
if len(least_costs) == 0:
logger.error('No paths found.')
return
ext = 'gpkg' if save_paths else 'csv'
fn_out = '{}_{}_{}.{}'.format(name, capacity_class, resolution, ext)
fpath_out = os.path.join(out_dir, fn_out)
logger.info('Writing output to {}'.format(fpath_out))
if save_paths:
least_costs.to_file(fpath_out, driver='GPKG')
else:
least_costs.to_csv(fpath_out, index=False)
logger.info('Writing output complete')
@main.command()
@click.option('--split-to-geojson', '-s', is_flag=True,
help='After merging GeoPackages, split into GeoJSON by POI name'
'.')
@click.option('--suppress-combined-file', is_flag=True,
help='Don\'t create combined layer.')
@click.option('--out-file', '-of', default=None, type=STR,
help='Name for output GeoPackage/CSV file.')
@click.option('--drop', '-d', default=None, type=STR, multiple=True,
help=('Transmission feature category types to drop from '
'results. Options: {}'.format(", ".join(TRANS_CAT_TYPES))))
@click.option('--out-dir', '-od', type=click.Path(),
default='./out', show_default=True,
help='Output directory for output files.')
@click.option('--simplify-geo', type=FLOAT,
show_default=True, default=None,
help='Simplify path geometries by a value before exporting.')
@click.argument('files', type=STR, nargs=-1)
@click.pass_context
# flake8: noqa: C901
def merge_output(ctx, split_to_geojson, suppress_combined_file, out_file,
out_dir, drop, simplify_geo, files):
"""
Merge output GeoPackage/CSV files and optionally convert to GeoJSON
"""
log_level = "DEBUG" if ctx.obj.get('VERBOSE') else "INFO"
init_logger('reVX', log_level=log_level)
if len(files) == 0:
logger.error('No files passed to be merged')
return
if drop:
for cat in drop:
if cat not in TRANS_CAT_TYPES:
logger.info('--drop options must on or more of {}, received {}'
.format(TRANS_CAT_TYPES, drop))
return
warnings.filterwarnings('ignore', category=RuntimeWarning)
dfs = []
for i, file in enumerate(files, start=1):
logger.info('Loading %s (%i/%i)', file, i, len(files))
df_tmp = gpd.read_file(file) if "gpkg" in file else pd.read_csv(file)
dfs.append(df_tmp)
df = pd.concat(dfs)
warnings.filterwarnings('default', category=RuntimeWarning)
if drop:
mask = df['category'].isin(drop)
logger.info('Dropping {} of {} total features with category(ies): {}'
.format(mask.sum(), len(df), ", ".join(drop)))
df = df[~mask]
df = df.reset_index()
if len(df) == 0:
logger.info('No transmission features to save.')
return
if simplify_geo:
logger.info('Simplifying geometries by {}'.format(simplify_geo))
df.geometry = df.geometry.simplify(simplify_geo)
if all(col in df for col in ["gid", "reinforcement_cost_per_mw"]):
df = min_reinforcement_costs(df)
create_dirs(out_dir)
# Create combined output file
if not suppress_combined_file:
out_file = ('combo_{}'.format(files[0])
if out_file is None else out_file)
out_file = os.path.join(out_dir, out_file)
logger.info('Saving all combined paths to %s', out_file)
if "gpkg" in out_file:
df.to_file(out_file, driver="GPKG")
else:
df.to_csv(out_file, index=False)
# Split out put in to GeoJSON by POI name
if split_to_geojson:
if not isinstance(df, gpd.GeoDataFrame):
click.echo('Geo-spatial aware input files must be provided to '
'split to Geo-JSON.')
sys.exit(1)
pois = set(df['POI Name'])
for i, poi in enumerate(pois, start=1):
safe_poi_name = poi.replace(' ', '_').replace('/', '_')
out_file = os.path.join(out_dir, f"{safe_poi_name}_paths.geojson")
paths = df[df['POI Name'] == poi].to_crs(epsg=4326)
logger.info('Writing {} paths for {} to {} ({}/{})'
.format(len(paths), poi, out_file, i, len(pois)))
paths.to_file(out_file, driver="GeoJSON")
@main.command()
@click.option('--cost_fpath', '-f', required=True,
type=click.Path(exists=True),
help=("Path to GeoPackage/CSV file with calculated transmission "
"costs. This file must have a 'trans_gid' column that "
"will be used to merge in the reinforcement costs."))
@click.option('--reinforcement_cost_fpath', '-r', required=True,
type=click.Path(exists=True),
help=("Path to GeoPackage/CSV file with calculated "
"reinforcement costs. This file must have a 'gid' column "
"that will be used to merge in the reinforcement costs."))
@click.option('--out_file', '-of', default=None, type=STR,
help='Name for output GeoPackage/CSV file.')
@click.pass_context
def merge_reinforcement_costs(ctx, cost_fpath, reinforcement_cost_fpath,
out_file):
"""
Merge reinforcement costs into transmission costs.
"""
log_level = "DEBUG" if ctx.obj.get('VERBOSE') else "INFO"
init_logger('reVX', log_level=log_level)
logger.info("Merging reinforcement costs into transmission costs...")
costs = (gpd.read_file(cost_fpath)
if "gpkg" in cost_fpath
else pd.read_csv(cost_fpath))
r_costs = (gpd.read_file(reinforcement_cost_fpath)
if "gpkg" in reinforcement_cost_fpath
else pd.read_csv(reinforcement_cost_fpath))
logger.info("Loaded spur-line costs for {:,} substations and "
"reinforcement costs for {:,} substations"
.format(len(costs["trans_gid"].unique()),
len(r_costs["gid"].unique())))
r_costs.index = r_costs.gid
costs = costs[costs["trans_gid"].isin(r_costs.gid)].copy()
logger.info("Found {:,} substations with both spur-line and "
"reinforcement costs"
.format(len(costs["trans_gid"].unique())))
r_cols = ["reinforcement_poi_lat", "reinforcement_poi_lon",
"reinforcement_dist_km", "reinforcement_cost_per_mw"]
costs[r_cols] = r_costs.loc[costs["trans_gid"], r_cols].values
logger.info("Writing output to {!r}".format(out_file))
if "gpkg" in out_file:
costs.to_file(out_file, driver="GPKG", index=False)
else:
costs.to_csv(out_file, index=False)
[docs]def get_node_cmd(config, gids):
"""
Get the node CLI call for Least Cost Xmission
Parameters
----------
config : reVX.config.least_cost_xmission.LeastCostXmissionConfig
Least Cost Xmission config object.
gids : list
List of SC point GID values to submit to local command.
Returns
-------
cmd : str
CLI call to submit to SLURM execution.
"""
args = ['-n {}'.format(SLURM.s(config.name)),
'local',
'-cost {}'.format(SLURM.s(config.cost_fpath)),
'-feats {}'.format(SLURM.s(config.features_fpath)),
'-regs {}'.format(SLURM.s(config.regions_fpath)),
'-rid {}'.format(SLURM.s(config.region_identifier_column)),
'-cap {}'.format(SLURM.s(config.capacity_class)),
'-res {}'.format(SLURM.s(config.resolution)),
'-xcfg {}'.format(SLURM.s(config.xmission_config)),
'-mll {}'.format(SLURM.s(config.min_line_length)),
'-gids {}'.format(SLURM.s(gids)),
'-nn {}'.format(SLURM.s(config.nn_sinks)),
'-buffer {}'.format(SLURM.s(config.clipping_buffer)),
'-bmult {}'.format(SLURM.s(config.barrier_mult)),
'-mw {}'.format(SLURM.s(config.execution_control.max_workers)),
'-o {}'.format(SLURM.s(config.dirout)),
'-log {}'.format(SLURM.s(config.log_directory)),
]
for layer in config.cost_layers:
args.append(f'-cl {layer}')
for layer in config.length_invariant_cost_layers:
args.append(f'-licl {layer}')
if config.save_paths:
args.append('--save_paths')
if config.radius:
args.append('-rad {}'.format(config.radius))
if config.expand_radius:
args.append('-er')
if config.simplify_geo:
args.append('--simplify-geo {}'.format(config.simplify_geo))
if config.log_level == logging.DEBUG:
args.append('-v')
cmd = ('python -m reVX.least_cost_xmission.least_cost_xmission_cli {}'
.format(' '.join(args)))
logger.debug('Submitting the following cli call:\n\t{}'.format(cmd))
return cmd
[docs]def run_local(ctx, config):
"""
Run Least Cost Xmission locally using config
Parameters
----------
ctx : click.ctx
click ctx object
config : reVX.config.least_cost_xmission.LeastCostXmissionConfig
Least Cost Xmission config object.
"""
ctx.obj['NAME'] = config.name
ctx.invoke(local,
cost_fpath=config.cost_fpath,
features_fpath=config.features_fpath,
regions_fpath=config.regions_fpath,
capacity_class=config.capacity_class,
resolution=config.resolution,
xmission_config=config.xmission_config,
min_line_length=config.min_line_length,
sc_point_gids=config.sc_point_gids,
nn_sinks=config.nn_sinks,
clipping_buffer=config.clipping_buffer,
barrier_mult=config.barrier_mult,
region_identifier_column=config.region_identifier_column,
max_workers=config.execution_control.max_workers,
out_dir=config.dirout,
log_dir=config.log_directory,
verbose=config.log_level,
radius=config.radius,
expand_radius=config.expand_radius,
save_paths=config.save_paths,
simplify_geo=config.simplify_geo,
cost_layers=config.cost_layers,
li_cost_layers=config.length_invariant_cost_layers,
)
[docs]def eagle(config, gids):
"""
Run Least Cost Xmission on Eagle HPC.
Parameters
----------
config : reVX.config.least_cost_xmission.LeastCostXmissionConfig
Least Cost Xmission config object.
"""
init_logger('rex', log_level='DEBUG')
cmd = get_node_cmd(config, gids)
name = config.name
log_dir = config.log_directory
stdout_path = os.path.join(log_dir, 'stdout/')
slurm_manager = SLURM()
logger.info('Running Least Cost Xmission on Eagle with '
'node name "{}"'.format(name))
out = slurm_manager.sbatch(cmd,
alloc=config.execution_control.allocation,
memory=config.execution_control.memory,
walltime=config.execution_control.walltime,
feature=config.execution_control.feature,
name=name, stdout_path=stdout_path,
conda_env=config.execution_control.conda_env,
module=config.execution_control.module)[0]
if out:
msg = ('Kicked off Least Cost Xmission "{}" '
'(SLURM jobid #{}) on Eagle.'
.format(name, out))
else:
msg = ('Was unable to kick off Least Cost Xmission '
'"{}". Please see the stdout error messages'
.format(name))
click.echo(msg)
logger.info(msg)
if __name__ == '__main__':
try:
main(obj={})
except Exception:
logger.exception('Error running Least Cost Xmission CLI')
raise