# -*- coding: utf-8 -*-
"""
Execution utilities.
"""
import multiprocessing
import concurrent.futures as cf
import subprocess
import logging
import gc
from math import floor
import os
import psutil
import shlex
from warnings import warn
from rex.utilities.loggers import LOGGERS, log_mem
from rex.utilities.exceptions import ExecutionError, ParallelExecutionWarning
logger = logging.getLogger(__name__)
[docs]
class SubprocessManager:
"""Base class to handle subprocess execution."""
[docs]
@staticmethod
def make_path(d):
"""Make a directory tree if it doesn't exist.
Parameters
----------
d : str
Directory tree to check and potentially create.
"""
if not os.path.exists(d):
os.makedirs(d)
[docs]
@staticmethod
def make_sh(fname, script):
"""Make a shell script (.sh file) to execute a subprocess.
Parameters
----------
fname : str
Name of the .sh file to create.
script : str
Contents to be written into the .sh file.
"""
logger.debug('The shell script "{n}" contains the following:\n'
'~~~~~~~~~~ {n} ~~~~~~~~~~\n'
'{s}\n'
'~~~~~~~~~~ {n} ~~~~~~~~~~'
.format(n=fname, s=script))
with open(fname, 'w+') as f:
f.write(script)
[docs]
@staticmethod
def rm(fname):
"""Remove a file.
Parameters
----------
fname : str
Filename (with path) to remove.
"""
os.remove(fname)
@staticmethod
def _subproc_popen(cmd):
"""Open a subprocess popen constructor and submit a command.
Parameters
----------
cmd : str
Command to be submitted using python subprocess.
Returns
-------
stdout : str
Subprocess standard output. This is decoded from the subprocess
stdout with rstrip.
stderr : str
Subprocess standard error. This is decoded from the subprocess
stderr with rstrip. After decoding/rstrip, this will be empty if
the subprocess doesn't return an error.
"""
cmd = shlex.split(cmd)
# use subprocess to submit command and get piped o/e
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
stderr = stderr.decode('ascii').rstrip()
stdout = stdout.decode('ascii').rstrip()
if process.returncode != 0:
raise OSError('Subprocess submission failed with return code {} '
'and stderr:\n{}'
.format(process.returncode, stderr))
return stdout, stderr
@staticmethod
def _subproc_run(cmd, background=False, background_stdout=False,
shell=True):
"""Open a subprocess and submit a command.
Parameters
----------
cmd : str
Command to be submitted using python subprocess.
background : bool
Flag to submit subprocess in the background. stdout stderr will
be empty strings if this is True.
background_stdout : bool
Flag to capture the stdout/stderr from the background process
in a nohup.out file.
"""
nohup_cmd = None
if background and background_stdout:
nohup_cmd = 'nohup {} &'
elif background and not background_stdout:
nohup_cmd = 'nohup {} </dev/null >/dev/null 2>&1 &'
if nohup_cmd is not None:
cmd = nohup_cmd.format(cmd)
shell = True
subprocess.run(cmd, shell=shell, check=True)
[docs]
@staticmethod
def submit(cmd, background=False, background_stdout=False):
"""Open a subprocess and submit a command.
Parameters
----------
cmd : str
Command to be submitted using python subprocess.
background : bool
Flag to submit subprocess in the background. stdout stderr will
be empty strings if this is True.
background_stdout : bool
Flag to capture the stdout/stderr from the background process
in a nohup.out file.
Returns
-------
stdout : str
Subprocess standard output. This is decoded from the subprocess
stdout with rstrip.
stderr : str
Subprocess standard error. This is decoded from the subprocess
stderr with rstrip. After decoding/rstrip, this will be empty if
the subprocess doesn't return an error.
"""
if background:
SubprocessManager._subproc_run(
cmd, background=background,
background_stdout=background_stdout)
stdout, stderr = '', ''
else:
stdout, stderr = SubprocessManager._subproc_popen(cmd)
return stdout, stderr
[docs]
@staticmethod
def s(s):
"""Format input as str w/ appropriate quote types for python cli entry.
Examples
--------
list, tuple -> "['one', 'two']"
dict -> "{'key': 'val'}"
int, float, None -> '0'
str, other -> 'string'
"""
if isinstance(s, (list, tuple, dict)):
return '"{}"'.format(s)
elif not isinstance(s, (int, float, type(None))):
return "'{}'".format(s)
else:
return '{}'.format(s)
[docs]
class SpawnProcessPool(cf.ProcessPoolExecutor):
"""An adaptation of concurrent futures ProcessPoolExecutor with
spawn processes instead of fork or forkserver."""
def __init__(self, *args, loggers=None, **kwargs):
"""
Parameters
----------
loggers : str | list, optional
logger(s) to initialize on workers, by default None
"""
if 'mp_context' in kwargs:
w = ('SpawnProcessPool being initialized with mp_context: "{}". '
'This will override default SpawnProcessPool behavior.'
.format(kwargs['mp_context']))
logger.warning(w)
warn(w, ParallelExecutionWarning)
else:
kwargs['mp_context'] = multiprocessing.get_context('spawn')
if loggers is not None:
kwargs['initializer'] = LOGGERS.init_loggers
kwargs['initargs'] = (loggers, )
super().__init__(*args, **kwargs)
[docs]
def execute_parallel(fun, execution_iter, n_workers=None, **kwargs):
"""Execute concurrent futures with an established cluster.
Parameters
----------
fun : function
Python function object that will be submitted to futures. See
downstream execution methods for arg passing structure.
execution_iter : iter
Python iterator that controls the futures submitted in parallel.
n_workers : int
Number of workers to run in parallel
**kwargs : dict
Key word arguments passed to the fun.
Returns
-------
results : list
List of futures results.
"""
futures = []
# initialize a client based on the input cluster.
with SpawnProcessPool(max_workers=n_workers) as executor:
# iterate through split executions, submitting each to worker
for i, exec_slice in enumerate(execution_iter):
logger.debug('Kicking off serial worker #{} for: {}'
.format(i, exec_slice))
# submit executions and append to futures list
futures.append(executor.submit(execute_single, fun, exec_slice,
worker=i, **kwargs))
# gather results
results = [future.result() for future in futures]
return results
[docs]
def execute_single(fun, input_obj, worker=0, **kwargs):
"""Execute a serial compute on a single core.
Parameters
----------
fun : function
Function to execute.
input_obj : object
Object passed as first argument to fun. Typically a project control
object that can be the result of iteration in the parallel execution
framework.
worker : int
Worker number (for debugging purposes).
**kwargs : dict
Key word arguments passed to fun.
"""
logger.debug('Running single serial execution on worker #{} for: {}'
.format(worker, input_obj))
out = fun(input_obj, **kwargs)
log_mem()
return out
[docs]
class SmartParallelJob:
"""Single node parallel compute manager with smart data flushing."""
def __init__(self, obj, execution_iter, n_workers=None, mem_util_lim=0.7):
"""Single node parallel compute manager with smart data flushing.
Parameters
----------
obj : object
Python object that will be submitted to futures. Must have methods
run(arg) and flush(). run(arg) must take the iteration result of
execution_iter as the single positional argument. Additionally,
the results of obj.run(arg) will be pa ssed to obj.out. obj.out
will be passed None when the memory is to be cleared. It is
advisable that obj.run() be a @staticmethod for dramatically
faster submission in parallel.
execution_iter : iter
Python iterator that controls the futures submitted in parallel.
n_workers : int
Number of workers to use in parallel. None will use all
available workers.
mem_util_lim : float
Memory utilization limit (fractional). If the used memory divided
by the total memory is greater than this value, the obj.out will
be flushed and the local node memory will be cleared.
"""
if not hasattr(obj, 'run') or not hasattr(obj, 'flush'):
raise ExecutionError('Parallel execution with object: "{}" '
'failed. The target object must have methods '
'run() and flush()'.format(obj))
self._obj = obj
self._execution_iter = execution_iter
self._n_workers = n_workers
self._mem_util_lim = mem_util_lim
@property
def execution_iter(self):
"""Get the iterator object that controls the parallel execution.
Returns
-------
_execution_iter : iterable
Iterable object that controls the processes of the parallel job.
"""
return self._execution_iter
@property
def mem_util_lim(self):
"""Get the memory utilization limit (fractional).
Returns
-------
_mem_util_lim : float
Fractional memory utilization limit. If the used memory divided
by the total memory is greater than this value, the obj.out will
be flushed and the local node memory will be cleared.
"""
return self._mem_util_lim
@property
def n_workers(self):
"""Get the number of workers in the local cluster.
Returns
-------
_n_workers : int
Number of workers. Default value is the number of CPU's.
"""
if self._n_workers is None:
self._n_workers = os.cpu_count()
return self._n_workers
@property
def obj(self):
"""Get the main python object that will be submitted to futures.
Returns
-------
_obj : Object
Python object that will be submitted to futures. Must have methods
run(arg) and flush(). run(arg) must take the iteration result of
execution_iter as the single positional argument. Additionally,
the results of obj.run(arg) will be passed to obj.out. obj.out
will be passed None when the memory is to be cleared. It is
advisable that obj.run() be a @staticmethod for dramatically
faster submission in parallel.
"""
return self._obj
[docs]
def flush(self):
"""Flush obj.out to disk, set obj.out=None, and garbage collect."""
# memory utilization limit exceeded, flush memory to disk
self.obj.flush()
self.obj.out = None
gc.collect()
[docs]
def gather_and_flush(self, i, futures, force_flush=False):
"""Wait on futures, potentially update obj.out and flush to disk.
Parameters
----------
i : int | str
Iteration number (for logging purposes).
futures : list
List of parallel future objects to wait on or gather.
force_flush : bool
Option to force a disk flush. Useful for end-of-iteration. If this
is False, will only flush to disk if the memory utilization exceeds
the mem_util_lim.
Returns
-------
futures : list
List of parallel future objects. If the memory was flushed, this is
a cleared list: futures.clear()
"""
# gather on each iteration so there is no big mem spike during flush
# (obj.out should be a property setter that will append new data.)
self.obj.out = [future.result() for future in futures]
futures.clear()
# useful log statements
mem = psutil.virtual_memory()
logger.info('Parallel run at iteration {0}. '
'Memory utilization is {1:.3f} GB out of {2:.3f} GB '
'total ({3:.1f}% used, limit of {4:.1f}%)'
.format(i, mem.used / 1e9, mem.total / 1e9,
100 * mem.used / mem.total,
100 * self.mem_util_lim))
# check memory utilization against the limit
if ((mem.used / mem.total) >= self.mem_util_lim) or force_flush:
# restart client to free up memory
# also seems to sync stderr messages (including warnings)
# flush data to disk
logger.info('Flushing memory to disk. The memory utilization is '
'{0:.2f}% and the limit is {1:.2f}%.'
.format(100 * (mem.used / mem.total),
100 * self.mem_util_lim))
self.flush()
return futures
[docs]
def run(self, **kwargs):
"""
Run ParallelSmartJobs
Parameters
----------
kwargs : dict
Keyword arguments to be passed to obj.run(). Makes it easier to
have obj.run() as a @staticmethod.
"""
logger.info('Executing parallel run on a local cluster with '
'{0} workers over {1} total iterations.'
.format(self.n_workers, 1 + len(self.execution_iter)))
log_mem()
# initialize a client based on the input cluster.
with SpawnProcessPool(max_workers=self.n_workers) as executor:
futures = []
# iterate through split executions, submitting each to worker
for i, exec_slice in enumerate(self.execution_iter):
logger.debug('Kicking off serial worker #{0} for: {1}. '
.format(i, exec_slice))
# submit executions and append to futures list
futures.append(executor.submit(self.obj.run, exec_slice,
**kwargs))
# Take a pause after one complete set of workers
if (i + 1) % self.n_workers == 0:
futures = self.gather_and_flush(i, futures)
# All futures complete
self.gather_and_flush('END', futures, force_flush=True)
logger.debug('Smart parallel job complete. Returning execution '
'control to higher level processes.')
log_mem()
[docs]
@classmethod
def execute(cls, obj, execution_iter, n_workers=None,
mem_util_lim=0.7, **kwargs):
"""Execute the smart parallel run with data flushing.
Parameters
----------
obj : object
Python object that will be submitted to futures. Must have methods
run(arg) and flush(). run(arg) must take the iteration result of
execution_iter as the single positional argument. Additionally,
the results of obj.run(arg) will be passed to obj.out. obj.out
will be passed None when the memory is to be cleared. It is
advisable that obj.run() be a @staticmethod for dramatically
faster submission in parallel.
execution_iter : iter
Python iterator that controls the futures submitted in parallel.
n_workers : int
Number of workers to scale the cluster to. None will use all
available workers in a local cluster.
mem_util_lim : float
Memory utilization limit (fractional). If the used memory divided
by the total memory is greater than this value, the obj.out will
be flushed and the local node memory will be cleared.
kwargs : dict
Keyword arguments to be passed to obj.run(). Makes it easier to
have obj.run() as a @staticmethod.
"""
manager = cls(obj, execution_iter, n_workers=n_workers,
mem_util_lim=mem_util_lim)
manager.run(**kwargs)