Source code for reVX.rpm.rpm_cli

# -*- coding: utf-8 -*-
"""
RPM Command Line Interface
"""
import click
import logging
import os
from rex.utilities.loggers import init_mult
from rex.utilities.cli_dtypes import STR, INT
from rex.utilities.hpc import SLURM
from rex.utilities.utilities import dict_str_load, get_class_properties

from reVX.config.rpm import (RPMConfig, ClusterConfigGroup,
                             RepProfilesConfigGroup)
from reVX.rpm.rpm_manager import RPMClusterManager as rpm_cm
from reVX.rpm.rpm_output import RPMOutput as rpm_o
from reVX.utilities.exceptions import RPMRuntimeError
from reVX import __version__

logger = logging.getLogger(__name__)


@click.group()
@click.version_option(version=__version__)
@click.option('--name', '-n', default='RPM', type=STR,
              show_default=True,
              help='Job name. Default is "RPM".')
@click.option('--verbose', '-v', is_flag=True,
              help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def main(ctx, name, verbose):
    """
    RPM Command Line Interface
    """
    ctx.ensure_object(dict)
    ctx.obj['VERBOSE'] = verbose
    ctx.obj['NAME'] = name


@main.command()
def valid_config_keys():
    """
    Echo the valid RPM config keys
    """
    config_classes = [RPMConfig, ClusterConfigGroup, RepProfilesConfigGroup]
    for cls in config_classes:
        cls_name = str(cls).rsplit('.', maxsplit=1)[-1].strip("'>")
        click.echo("Valid keys for {}: {}"
                   .format(cls_name, ', '.join(get_class_properties(cls))))


[docs]def run_local(ctx, config): """ Run reV to ReEDs locally from config Parameters ---------- ctx : click.ctx click ctx object config : RPMConfig RPM Config object """ ctx.obj['NAME'] = config.name ctx.invoke(local, out_dir=config.dirout, cf_profiles=config.cf_profiles, log_dir=config.log_directory, max_workers=config.execution_control.max_workers) if config.cluster is not None: ctx.invoke(cluster, rpm_meta=config.cluster.rpm_meta, region_col=config.cluster.region_col, dist_rank_filter=config.cluster.dist_rank_filter, contiguous_filter=config.cluster.contiguous_filter) if config.rep_profiles is not None: ctx.invoke(rep_profiles, rpm_clusters=config.rep_profiles.rpm_clusters, exclusions=config.rep_profiles.exclusions, excl_dict=config.rep_profiles.excl_dict, techmap_dset=config.rep_profiles.techmap_dset, trg_bins=config.rep_profiles.trg_bins, trg_dset=config.rep_profiles.trg_dset, n_profiles=config.rep_profiles.n_profiles, forecast_fpath=config.rep_profiles.forecast_fpath)
@main.command() @click.option('--config', '-c', required=True, type=click.Path(exists=True), help='Filepath to reVX-rpm config json file.') @click.option('--verbose', '-v', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context def from_config(ctx, config, verbose): """ Run reVX-rpm from a config. """ config = RPMConfig(config) if 'VERBOSE' in ctx.obj: if any((ctx.obj['VERBOSE'], verbose)): config._log_level = logging.DEBUG elif verbose: config._log_level = logging.DEBUG if config.execution_control.option == 'local': run_local(ctx, config) elif config.execution_control.option == 'eagle': eagle(config) @main.group(chain=True) @click.option('--out_dir', '-o', required=True, type=click.Path(), help='Directory to dump output files') @click.option('--cf_profiles', '-cf', required=True, type=click.Path(exists=True), help=('Path to reV .h5 file containing desired capacity factor ' 'profiles')) @click.option('--log_dir', '-log', default=None, type=STR, show_default=True, help='Directory to dump log files. Default is out_dir.') @click.option('--max_workers', '-mw', type=INT, default=None, show_default=True, help=('Number of parallel workers. 1 will run serial, ' 'None will use all available.')) @click.option('--verbose', '-v', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context def local(ctx, out_dir, cf_profiles, log_dir, max_workers, verbose): """ Run reVX-REEDS on local hardware. """ ctx.obj['OUT_DIR'] = out_dir if not os.path.exists(out_dir): os.makedirs(out_dir) if log_dir is None: log_dir = out_dir name = ctx.obj['NAME'] if 'VERBOSE' in ctx.obj: verbose = any((ctx.obj['VERBOSE'], verbose)) log_modules = [__name__, 'reVX', 'reV', 'rex'] init_mult(name, log_dir, modules=log_modules, verbose=verbose) logger.info('Running reV to RPM pipeline\n' 'Outputs to be stored in: {}'.format(out_dir)) ctx.obj['CF_PROFILES'] = cf_profiles ctx.obj['MAX_WORKERS'] = max_workers @local.command() @click.option('--rpm_meta', '-m', required=True, type=click.Path(exists=True), help='Path to .csv or .json containing the RPM meta data:' '- Categorical regions of interest with column label "region"' '- # of clusters per region with column label "clusters"' '- A column that maps the RPM regions to the cf_fpath meta data:' ' "res_gid" (priorized) or "gen_gid". This can be omitted if ' ' the rpm_region_col kwarg input is found in the cf_fpath meta') @click.option('--region_col', '-reg', type=str, default=None, show_default=True, help='The meta-data field to map RPM regions to') @click.option('--dist_rank_filter', '-drf', is_flag=True, help=('Re-cluster data by minimizing the sum of the: ' 'distance between each point and each cluster centroid')) @click.option('--contiguous_filter', '-cf', is_flag=True, help=('Flag to re-classify clusters by making contigous cluster ' 'polygons')) @click.pass_context def cluster(ctx, rpm_meta, region_col, dist_rank_filter, contiguous_filter): """ Cluster RPM Regions """ name = ctx.obj['NAME'] cf_profiles = ctx.obj['CF_PROFILES'] out_dir = ctx.obj['OUT_DIR'] max_workers = ctx.obj['MAX_WORKERS'] logger.info('Clustering regions based on:\n{}'.format(rpm_meta)) rpm_clusters = rpm_cm.run_clusters(cf_profiles, rpm_meta, out_dir, job_tag=name, rpm_region_col=region_col, max_workers=max_workers, dist_rank_filter=dist_rank_filter, contiguous_filter=contiguous_filter, method_kwargs={"n_init": 10}) logger.info('reVX - RPM clustering methods complete.') ctx.obj['RPM_CLUSTERS'] = rpm_clusters @local.command() @click.option('--rpm_clusters', '-rc', type=STR, default=None, help=('Path to pre-existing RPM cluster results .csv with ' '(gid, gen_gid, cluster_id, rank)')) @click.option('--exclusions', '-excl', default=None, type=click.Path(exists=True), show_default=True, help=('Filepath to exclusions data (must match the techmap grid)' ' None will not apply exclusions.')) @click.option('--excl_dict', '-exd', default=None, type=STR, show_default=True, help='String representation of a dictionary of exclusion ' 'LayerMask arguments {layer: {kwarg: value}} where layer is a ' 'dataset in excl_fpath and kwarg can be "inclusion_range", ' '"exclude_values", "include_values", "use_as_weights", ' 'or "weight".') @click.option('--techmap_dset', '-tmd', default=None, type=STR, show_default=True, help=('Dataset name in the techmap file containing the ' 'exclusions-to-resource mapping data.')) @click.option('--trg_bins', '-trg', default=None, type=STR, show_default=True, help=('Filepath to a single-column CSV containing ordered ' 'TRG bin edges.')) @click.option('--trg_dset', '-trgd', default='lcoe_fcr', type=STR, show_default=True, help=('TRG dataset found in cf_fpath that is associated with ' 'the TRG bins')) @click.option('--n_profiles', '-np', type=INT, default=1, show_default=True, help=('Number of profiles per cluster to export.')) @click.option('--forecast_fpath', '-fcst', type=STR, default=None, show_default=True, help=('reV generation output file for forecast data. If this is ' 'input, profiles will be taken from forecast file instead ' 'of the cf file, based on a NN mapping.')) @click.pass_context def rep_profiles(ctx, rpm_clusters, exclusions, excl_dict, techmap_dset, trg_bins, trg_dset, n_profiles, forecast_fpath): """ Extract representative profiles from RPM clusters """ name = ctx.obj['NAME'] cf_profiles = ctx.obj['CF_PROFILES'] out_dir = ctx.obj['OUT_DIR'] max_workers = ctx.obj['MAX_WORKERS'] if rpm_clusters is None: if 'RPM_CLUSTERS' not in ctx.obj: msg = ('You must run "cluster" or provide path to existing ' 'RPM clusters to extract representative profiles!') logger.error(msg) raise RPMRuntimeError(msg) rpm_clusters = ctx.obj['RPM_CLUSTERS'] logger.info('Extracting representative profiles from RPM clusters: {}' .format(rpm_clusters)) logger.info('Extracting representative profiles using exclusions: {}' .format(exclusions)) if trg_bins is not None: logger.info('Applying TRGs from dset "{}" : {}' .format(trg_dset, trg_bins)) if isinstance(excl_dict, str): excl_dict = dict_str_load(excl_dict) rpm_o.process_outputs(rpm_clusters, cf_profiles, exclusions, excl_dict, techmap_dset, out_dir, job_tag=name, max_workers=max_workers, trg_bins=trg_bins, trg_dset=trg_dset) if forecast_fpath is not None or n_profiles > 1: logger.info('Extracting extra representative profiles from: {}' .format(rpm_clusters)) if forecast_fpath is not None: logger.info('Using forecast file: {}'.format(forecast_fpath)) rpm_o.extract_profiles(rpm_clusters, cf_profiles, out_dir, n_profiles=n_profiles, job_tag=name, max_workers=max_workers, forecast_fpath=forecast_fpath)
[docs]def get_node_cmd(config): """ Get the node CLI call for the reVX-REEDS pipeline. Parameters ---------- config : reVX.config.reeds.ReedsConfig reVX-REEDS config object. Returns ------- cmd : str CLI call to submit to SLURM execution. """ args = ['-n {}'.format(SLURM.s(config.name)), 'local', '-o {}'.format(SLURM.s(config.dirout)), '-cf {}'.format(SLURM.s(config.cf_profiles)), '-mw {}'.format(SLURM.s(config.execution_control.max_workers)), '-log {}'.format(SLURM.s(config.log_directory)), ] if config.log_level == logging.DEBUG: args.append('-v') if config.cluster is not None: cluster = ['cluster', '-m {}'.format(SLURM.s(config.cluster.rpm_meta)), '-reg {}'.format(SLURM.s(config.cluster.region_col)), ] if config.cluster.dist_rank_filter: cluster.append('-drf') if config.cluster.contiguous_filter: cluster.append('-cf') args.extend(cluster) if config.rep_profiles is not None: rep_profiles = config.rep_profiles.copy() profiles = ['rep_profiles', '-rc {}'.format(SLURM.s(rep_profiles.rpm_clusters)), '-excl {}'.format(SLURM.s(rep_profiles.exclusions)), '-exd {}'.format(SLURM.s(rep_profiles.excl_dict)), '-tmp {}'.format(SLURM.s(rep_profiles.techmap_dset)), '-trg {}'.format(SLURM.s(rep_profiles.trg_bins)), '-trgd {}'.format(SLURM.s(rep_profiles.trg_dset)), '-np {}'.format(SLURM.s(rep_profiles.n_profiles)), '-fcst {}'.format(SLURM.s(rep_profiles.forecast_fpath)), ] args.extend(profiles) cmd = 'python -m reVX.rpm.rpm_cli {}'.format(' '.join(args)) logger.debug('Submitting the following cli call:\n\t{}'.format(cmd)) return cmd
[docs]def eagle(config): """ Run reVX-RPM on Eagle HPC. Parameters ---------- config : reVX.config.rpm.RPMConfig reVX-RPM config object. """ cmd = get_node_cmd(config) name = config.name log_dir = config.log_directory stdout_path = os.path.join(log_dir, 'stdout/') logger.info('Running reVX-RPM pipeline on Eagle with ' 'node name "{}"'.format(name)) slurm_manager = SLURM() out = slurm_manager.sbatch(cmd, name=name, stdout_path=stdout_path, alloc=config.execution_control.allocation, memory=config.execution_control.memory, walltime=config.execution_control.walltime, feature=config.execution_control.feature, module=config.execution_control.module, conda_env=config.execution_control.conda_env, )[0] if out: msg = ('Kicked off reVX-RPM pipeline job "{}" ' '(SLURM jobid #{}) on Eagle.' .format(name, out)) else: msg = ('Was unable to kick off reVX-RPM pipeline job "{}". ' 'Please see the stdout error messages' .format(name)) click.echo(msg) logger.info(msg)
if __name__ == '__main__': try: main(obj={}) except Exception: logger.exception('Error running RPM CLI') raise