# -*- coding: utf-8 -*-
"""
Execution utilities.
"""
from abc import ABC, abstractmethod
import subprocess
import logging
import getpass
import shlex
from warnings import warn
import re
from rex.utilities.execution import SubprocessManager
from rex.utilities.exceptions import (ExecutionError, HpcError, SlurmWarning,
PbsWarning)
logger = logging.getLogger(__name__)
[docs]
class HpcJobManager(SubprocessManager, ABC):
"""Abstract HPC job manager framework"""
# get username as class attribute.
USER = getpass.getuser()
# HPC queue column headers
QCOL_NAME = None # Job name column
QCOL_ID = None # Job integer ID column
QCOL_STATUS = None # Job status column
# set a max job name length, will raise error if too long.
MAX_NAME_LEN = 100
# default rows to skip in queue stdout
QSKIP = None
def __init__(self, user=None, queue_dict=None):
"""
Parameters
----------
user : str | None
HPC username. None will get your username using getpass.getuser()
queue_dict : dict | None
Parsed HPC queue dictionary (qstat for PBS or squeue for SLURM)
from parse_queue_str(). None will get the queue from PBS or SLURM.
"""
self._user = user
if self._user is None:
self._user = self.USER
if queue_dict is not None and not isinstance(queue_dict, dict):
emsg = ('HPC queue_dict arg must be None or Dict but received: '
'{}, {}'.format(queue_dict, type(queue_dict)))
logger.error(emsg)
raise HpcError(emsg)
self._queue = queue_dict
@staticmethod
def _skip_q_rows(queue_str, skip_rows):
"""Remove rows from the queue_str that are to be skipped.
Parameters
----------
queue_str : str
HPC queue output string. Can be split on line breaks to get list.
skip_rows : int | list | None
Optional row index values to skip.
Returns
-------
queue_str : str
HPC queue output string. Can be split on line breaks to get list.
"""
if skip_rows is not None:
if isinstance(skip_rows, int):
skip_rows = [skip_rows]
queue_str = [row for i, row in enumerate(queue_str.split('\n'))
if i not in skip_rows]
queue_str = '\n'.join(queue_str)
return queue_str
[docs]
@classmethod
def parse_queue_str(cls, queue_str, keys=0):
"""Parse the qstat or squeue output string into a dict format keyed by
integer job id with nested dictionary of job properties (queue
printout columns).
Parameters
----------
queue_str : str
HPC queue output string (qstat for PBS or squeue for SLURM).
Typically a space-delimited string with line breaks.
keys : list | int
Argument to set the queue job attributes (column headers).
This defaults to an integer which says which row index contains
the space-delimited column headers. Can also be a list to
explicitly set the column headers.
Returns
-------
queue_dict : dict
HPC queue parsed into dictionary format keyed by integer job id
with nested dictionary of job properties (queue printout columns).
"""
queue_dict = {}
queue_rows = queue_str.split('\n')
if isinstance(keys, int):
del_index = keys
keys = [k.strip(' ')
for k in queue_rows[keys].strip(' ').split(' ')
if k != '']
del queue_rows[del_index]
for row in queue_rows:
job = [k.strip(' ') for k in row.strip(' ').split(' ') if k != '']
job_id = int(job[keys.index(cls.QCOL_ID)])
queue_dict[job_id] = {k: job[i] for i, k in enumerate(keys)}
return queue_dict
[docs]
@abstractmethod
def query_queue(self, job_name=None, user=None, qformat=None,
skip_rows=None):
"""Run the HPC queue command and return the raw stdout string.
Parameters
----------
job_name : str | None
Optional to check the squeue for a specific job name (not limited
to the 8 shown characters) or None to show user's whole queue.
user : str | None
HPC username. None will get your username using getpass.getuser()
qformat : str | None
Queue format string specification. Changing this form the
default (None) could have adverse effects!
skip_rows : int | list | None
Optional row index values to skip.
Returns
-------
stdout : str
HPC queue output string. Can be split on line breaks to get list.
"""
@property
def queue(self):
"""Get the HPC queue parsed into dict format keyed by integer job id
Returns
-------
queue : dict
HPC queue parsed into dictionary format keyed by integer job id
with nested dictionary of job properties (queue printout columns).
"""
if self._queue is None:
qstr = self.query_queue(user=self._user)
self._queue = self.parse_queue_str(qstr)
return self._queue
@property
def queue_job_names(self):
"""Get a list of the job names in the queue"""
return [attrs[self.QCOL_NAME] for attrs in self.queue.values()]
@property
def queue_job_ids(self):
"""Get a list of the job integer ids in the queue"""
return list(self.queue.keys())
[docs]
def check_status(self, job_id=None, job_name=None):
"""Check the status of an HPC job using the HPC queue.
Parameters
----------
job_id : int | None
Job integer ID number (preferred input)
job_name : str
Job name string.
Returns
-------
status : str | NoneType
Queue job status str or None if not found.
SLURM status strings: PD, R, CG (pending, running, complete).
PBS status strings: Q, R, C (queued, running, complete).
"""
status = None
if job_id is not None:
if int(job_id) in self.queue:
status = self.queue[int(job_id)][self.QCOL_STATUS]
elif job_name is not None:
if job_name in self.queue_job_names:
for attrs in self.queue.values():
if attrs[self.QCOL_NAME] == job_name:
status = attrs[self.QCOL_STATUS]
break
else:
msg = 'Need a job_id or job_name to check HPC job status!'
logger.error(msg)
raise HpcError(msg)
return status
[docs]
class PBS(HpcJobManager):
"""Subclass for PBS subprocess jobs."""
# PBS qstat column headers
QCOL_NAME = 'Name' # Job name column
QCOL_ID = 'Job id' # Job integer ID column
QCOL_STATUS = 'S' # Job status column
# Frozen PBS qstat column headers cached b/c its not space delimited
QSTAT_KEYS = ('Job id', 'Name', 'User', 'Time Use', 'S', 'Queue')
# set a max job name length, will raise error if too long.
MAX_NAME_LEN = 100
# default rows to skip in queue stdout
QSKIP = (0, 1)
def __init__(self, user=None, queue_dict=None):
"""
Parameters
----------
user : str | None
HPC username. None will get your username using getpass.getuser()
queue_dict : dict | None
Parsed HPC queue dictionary (qstat for PBS or squeue for SLURM)
from parse_queue_str(). None will get the queue from PBS or SLURM.
"""
super().__init__(user=user, queue_dict=queue_dict)
[docs]
@classmethod
def query_queue(cls, job_name=None, user=None, qformat=None,
skip_rows=None):
"""Run the PBS qstat command and return the raw stdout string.
Parameters
----------
job_name : str | None
Optional to check the squeue for a specific job name (not limited
to the 8 shown characters) or None to show user's whole queue.
user : str | None
HPC username. None will get your username using getpass.getuser()
qformat : str | None
Queue format string specification. Changing this form the
default (None) could have adverse effects!
skip_rows : int | list | None
Optional row index values to skip.
Returns
-------
stdout : str
qstat output string. Can be split on line breaks to get list.
"""
if user is None:
user = cls.USER
if skip_rows is None:
skip_rows = cls.QSKIP
cmd = 'qstat -u {user}'.format(user=user)
stdout, _ = cls.submit(cmd)
stdout = cls._skip_q_rows(stdout, skip_rows)
return stdout
@property
def queue(self):
"""Get the HPC queue parsed into dict format keyed by integer job id
Returns
-------
queue : dict
HPC queue parsed into dictionary format keyed by integer job id
with nested dictionary of job properties (queue printout columns).
"""
if self._queue is None:
qstr = self.query_queue(user=self._user)
self._queue = self.parse_queue_str(qstr, keys=self.QSTAT_KEYS)
return self._queue
[docs]
def qsub(self, cmd, alloc, queue, name='reV', feature=None,
stdout_path='./stdout', keep_sh=False):
"""Submit a PBS job via qsub command and PBS shell script
Parameters
----------
cmd : str
Command to be submitted in PBS shell script. Example:
'python -m reV.generation.cli_gen'
alloc : str
HPC allocation account. Example: 'rev'.
queue : str
HPC queue to submit job to. Example: 'short', 'batch-h', etc...
name : str
PBS job name.
feature : str | None
PBS feature request (-l {feature}).
Example: 'feature=24core', 'qos=high', etc...
stdout_path : str
Path to print .stdout and .stderr files.
keep_sh : bool
Boolean to keep the .sh files. Default is to remove these files
after job submission.
Returns
-------
out : str
qsub standard output, this is typically the PBS job ID.
err : str
qsub standard error, this is typically an empty string if the job
was submitted successfully.
"""
if len(name) > self.MAX_NAME_LEN:
msg = ('Cannot submit job with name longer than {} chars: "{}"'
.format(self.MAX_NAME_LEN, name))
logger.error(msg)
raise ValueError(msg)
status = self.check_status(job_name=name)
if status in ('Q', 'R'):
logger.info('Not submitting job "{}" because it is already in '
'qstat with status: "{}"'.format(name, status))
out = None
err = 'already_running'
else:
self.make_path(stdout_path)
feature_str = '#PBS -l {}\n'.format(str(feature).replace(' ', ''))
fname = '{}.sh'.format(name)
script = ('#!/bin/bash\n'
'#PBS -N {n} # job name\n'
'#PBS -A {a} # allocation account\n'
'#PBS -q {q} # queue (debug, short, batch, or long)\n'
'#PBS -o {p}/{n}_$PBS_JOBID.o\n'
'#PBS -e {p}/{n}_$PBS_JOBID.e\n'
'{L}'
'echo Running on: $HOSTNAME, Machine Type: $MACHTYPE\n'
'{cmd}'
.format(n=name, a=alloc, q=queue, p=stdout_path,
L=feature_str if feature else '',
cmd=cmd))
# write the shell script file and submit as qsub job
self.make_sh(fname, script)
out, err = self.submit('qsub {script}'.format(script=fname))
if not keep_sh:
self.rm(fname)
if err:
msg = 'Received a PBS error or warning: {}'.format(err)
logger.warning(msg)
warn(msg, PbsWarning)
else:
logger.debug('PBS job "{}" with id #{} submitted successfully'
.format(name, out))
self._queue[int(out)] = {self.QCOL_ID: int(out),
self.QCOL_NAME: name,
self.QCOL_STATUS: 'Q'}
return out, err
[docs]
class SLURM(HpcJobManager):
"""Subclass for SLURM subprocess jobs."""
# SLURM squeue column headers
QCOL_NAME = 'NAME' # Job name column
QCOL_ID = 'JOBID' # Job integer ID column
QCOL_STATUS = 'ST' # Job status column
MAX_NAME_LEN = 100
SQ_FORMAT = ("%.15i %.30P %.{}j %.20u %.10t %.15M %.25R %q"
.format(MAX_NAME_LEN))
# default rows to skip in queue stdout
QSKIP = None
def __init__(self, user=None, queue_dict=None):
"""
Parameters
----------
user : str | None
HPC username. None will get your username using getpass.getuser()
queue_dict : dict | None
Parsed HPC queue dictionary (qstat for PBS or squeue for SLURM)
from parse_queue_str(). None will get the queue from PBS or SLURM.
"""
super().__init__(user=user, queue_dict=queue_dict)
[docs]
@classmethod
def query_queue(cls, job_name=None, user=None, qformat=None,
skip_rows=None):
"""Run the HPC queue command and return the raw stdout string.
Parameters
----------
job_name : str | None
Optional to check the squeue for a specific job name (not limited
to the 8 shown characters) or None to show user's whole queue.
user : str | None
HPC username. None will get your username using getpass.getuser()
qformat : str | None
Queue format string specification. Changing this form the
default (None) could have adverse effects!
skip_rows : int | list | None
Optional row index values to skip.
Returns
-------
stdout : str
HPC queue output string. Can be split on line breaks to get list.
"""
job_name_str = ''
if job_name is not None:
job_name_str = ' -n {}'.format(job_name)
if user is None:
user = cls.USER
if qformat is None:
qformat = cls.SQ_FORMAT
if skip_rows is None:
skip_rows = cls.QSKIP
cmd = ('squeue -u {user}{job_name} --format="{format_str}"'
.format(user=user, job_name=job_name_str, format_str=qformat))
stdout, _ = cls.submit(cmd)
stdout = cls._skip_q_rows(stdout, skip_rows)
return stdout
[docs]
@staticmethod
def scontrol(cmd):
"""Submit an scontrol command.
Parameters
----------
cmd : str
Command string after "scontrol" word
"""
cmd = 'scontrol {}'.format(cmd)
cmd = shlex.split(cmd)
subprocess.call(cmd)
[docs]
def scancel(self, arg):
"""Cancel a slurm job.
Parameters
----------
arg : int | list | str
SLURM integer job id(s) to cancel. Can be a list of integer
job ids, 'all' to cancel all jobs, or a feature (-p short) to
cancel all jobs with a given feature
"""
if isinstance(arg, (list, tuple)):
for job_id in arg:
self.scancel(job_id)
elif str(arg).lower() == 'all':
self._queue = None
for job_id in self.queue_job_ids:
self.scancel(job_id)
elif isinstance(arg, (int, str)):
cmd = ('scancel {}'.format(arg))
cmd = shlex.split(cmd)
subprocess.call(cmd)
else:
e = ('Could not cancel: {} with type {}'
.format(arg, type(arg)))
logger.error(e)
raise ExecutionError(e)
[docs]
def change_qos(self, arg, qos):
"""Change the priority (quality of service) for a job.
Parameters
----------
arg : int | list | str
SLURM integer job id(s) to change qos for.
Can be 'all' for all jobs.
qos : str
New qos value
"""
if isinstance(arg, (list, tuple)):
for job_id in arg:
self.change_qos(job_id, qos)
elif isinstance(arg, int):
cmd = 'update job {} QOS={}'.format(arg, qos)
self.scontrol(cmd)
elif str(arg).lower() == 'all':
self._queue = None
for job_id, attrs in self.queue.items():
status = attrs[self.QCOL_STATUS].lower()
if status == 'pd':
self.change_qos(job_id, qos)
else:
e = ('Could not change qos of: {} with type {}'
.format(arg, type(arg)))
logger.error(e)
raise ExecutionError(e)
[docs]
def hold(self, arg):
"""Temporarily hold a job from submitting. Held jobs will stay in queue
but will not get nodes until released.
Parameters
----------
arg : int | list | str
SLURM integer job id(s) to hold. Can be 'all' to hold all jobs.
"""
if isinstance(arg, (list, tuple)):
for job_id in arg:
self.hold(job_id)
elif isinstance(arg, int):
cmd = 'hold {}'.format(arg)
self.scontrol(cmd)
elif str(arg).lower() == 'all':
self._queue = None
for job_id, attrs in self.queue.items():
status = attrs[self.QCOL_STATUS].lower()
if status == 'pd':
self.hold(job_id)
else:
e = ('Could not hold: {} with type {}'
.format(arg, type(arg)))
logger.error(e)
raise ExecutionError(e)
[docs]
def release(self, arg):
"""Release a job that was previously on hold so it will be submitted
to a compute node.
Parameters
----------
arg : int | list | str
SLURM integer job id(s) to release.
Can be 'all' to release all jobs.
"""
if isinstance(arg, (list, tuple)):
for job_id in arg:
self.release(job_id)
elif isinstance(arg, int):
cmd = 'release {}'.format(arg)
self.scontrol(cmd)
elif str(arg).lower() == 'all':
self._queue = None
for job_id, attrs in self.queue.items():
status = attrs[self.QCOL_STATUS].lower()
reason = attrs['NODELIST(REASON)'].lower()
if status == 'pd' and 'jobheld' in reason:
self.release(job_id)
else:
e = ('Could not release: {} with type {}'
.format(arg, type(arg)))
logger.error(e)
raise ExecutionError(e)
@staticmethod
def _special_cmd_strs(feature, memory, module, module_root, conda_env):
"""Get special sbatch request strings for SLURM features, memory,
modules, and conda environments
Parameters
----------
feature : str
Additional flags for SLURM job. Format is "--qos=high"
or "--depend=[state:job_id]". Default is None.
memory : int
Node memory request in GB.
module : bool
Module to load
module_root : str
Path to module root to load
conda_env : str
Conda environment to activate
Returns
-------
feature_str : str
SBATCH shell script feature request string.
mem_str : str
SBATCH shell script memory request string.
env_str : str
SBATCH shell script module load or source activate environment
request string.
"""
feature_str = ''
if feature is not None:
feature_str = '#SBATCH {} # extra feature\n'.format(feature)
mem_str = ''
if memory is not None:
mem_str = ('#SBATCH --mem={} # node RAM in MB\n'
.format(int(memory * 1000)))
env_str = ''
if module is not None:
env_str = ("echo module use {module_root}\n"
"module use {module_root}\n"
"echo module load {module}\n"
"module load {module}\n"
"echo module load complete!\n"
.format(module_root=module_root, module=module))
elif conda_env is not None:
env_str = ("echo source activate {conda_env}\n"
"source activate {conda_env}\n"
"echo conda env activate complete!\n"
.format(conda_env=conda_env))
return feature_str, mem_str, env_str
@staticmethod
def _job_id_or_out(out):
"""Check stdout for a job id and return just the job id if present,
otherwise return full stdout.
Parameters
----------
out : str
stdout from self.submit()
Returns
-------
stdout : str
If job id is present in out then this is just the job id otherwise
it is the full stdout
"""
stdout = re.sub("[^0-9]", "", str(out))
if not stdout:
stdout = out
return stdout
[docs]
def sbatch(self, cmd, alloc=None, walltime=None, memory=None, nodes=1,
feature=None, name='reV', stdout_path='./stdout', keep_sh=False,
conda_env=None, module=None,
module_root='/shared-projects/rev/modulefiles'):
"""Submit a SLURM job via sbatch command and SLURM shell script
Parameters
----------
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m reV.generation.cli_gen'
alloc : str
HPC project (allocation) handle. Example: 'rev'. Default is not to
state an allocation (does not work on Eagle slurm).
walltime : float
Node walltime request in hours. Default is not to state a walltime
(does not work on Eagle slurm).
memory : int
Node memory request in GB.
nodes : int
Number of nodes to use for this sbatch job. Default is 1.
feature : str
Additional flags for SLURM job. Format is "--qos=high"
or "--depend=[state:job_id]". Default is None.
name : str
SLURM job name.
stdout_path : str
Path to print .stdout and .stderr files.
keep_sh : bool
Boolean to keep the .sh files. Default is to remove these files
after job submission.
conda_env : str
Conda environment to activate
module : bool
Module to load
module_root : str
Path to module root to load
Returns
-------
out : str
sbatch standard output, if submitted successfully, this is the
slurm job id.
err : str
sbatch standard error, this is typically an empty string if the job
was submitted successfully.
"""
if len(name) > self.MAX_NAME_LEN:
msg = ('Cannot submit job with name longer than {} chars: "{}"'
.format(self.MAX_NAME_LEN, name))
logger.error(msg)
raise ValueError(msg)
status = self.check_status(job_name=name)
if status is not None:
logger.info('Not submitting job "{}" because it is in '
'squeue or has been recently submitted'.format(name))
out = None
err = 'already_running'
else:
fname = '{}.sh'.format(name)
self.make_path(stdout_path)
# make all the sbatch arguments
sb_a = f'#SBATCH --account={alloc}' if alloc is not None else ''
walltime = self.format_walltime(walltime)
sb_t = f'#SBATCH --time={walltime}' if walltime is not None else ''
sb_jn = f'#SBATCH --job-name={name} # job name'
sb_no = f'#SBATCH --nodes={nodes} # number of nodes'
sb_out = f'#SBATCH --output={stdout_path}/{name}_%j.o'
sb_err = f'#SBATCH --error={stdout_path}/{name}_%j.e'
sbf, sbm, env_str = self._special_cmd_strs(feature, memory, module,
module_root, conda_env)
script_args = ['#!/bin/bash']
sb_args = (sb_a, sb_t, sb_jn, sb_no, sb_out, sb_err, sbf, sbm,
env_str)
for sb_arg in sb_args:
if sb_arg:
script_args.append(sb_arg)
script_args.append('echo Running on: $HOSTNAME, '
'Machine Type: $MACHTYPE')
script_args.append(cmd)
script = '\n'.join(script_args)
# write the shell script file and submit as qsub job
self.make_sh(fname, script)
out, err = self.submit('sbatch {script}'.format(script=fname))
out = self._job_id_or_out(out)
if not keep_sh:
self.rm(fname)
if err:
msg = 'Received a SLURM error or warning: {}'.format(err)
logger.warning(msg)
warn(msg, SlurmWarning)
else:
job_id = int(out.split(' ', maxsplit=-1)[-1])
out = str(job_id)
logger.debug('SLURM job "{}" with id #{} submitted '
'successfully'.format(name, job_id))
self._queue[job_id] = {self.QCOL_ID: job_id,
self.QCOL_NAME: name,
self.QCOL_STATUS: 'PD'}
return out, err