Dask
Dask is a framework for parallelizing Python code. The most common use case is to enable Python programmers to scale scientific and machine learning analyses to run on distributed hardware. Dask has similarities to Apache Spark (see FAQ for comparison), but Dask is more Python native and interfaces with common scientific libraries such as NumPy and Pandas.
Installation
Warning
Conda environments should be always be installed outside of your home directory for storage and performance reasons. This is especially important for frameworks like Dask, whose parallel processes can particularly strain the /home
filesystem. Please refer to our dedicated conda documentation for more information on how to setup your conda environments to redirect the installation outside of /home
by default.
Dask can be installed via Conda/Mamba. For example, to install Dask into a new environment from conda-forge
into your /projects
allocation folder, first load the appropriate conda (or mamba) module (e.g., module load mamba
on Kestrel), and then run the following on a compute node.
# Be sure to replace "<allocation_handle>" with your HPC project.
# interactive job
salloc -A <allocation_handle> -p debug -t 01:00:00
# load mamba module
ml mamba
# create and activate `dask-env` environment with Python 3.12
mamba create --prefix=/projects/<allocation_handle>/dask-env conda-forge::python=3.12 conda-forge::dask
conda activate /projects/<allocation_handle>/dask-env
This installs Dask along with common dependencies such as NumPy. Additionally, the dask-jobqueue
package (discussed below), can be installed via:
mamba install conda-forge::dask-jobqueue
Further, there is the dask-mpi
package (also discussed below). To ensure compatibility with the system MPI libraries, it is recommended to install dask-mpi
using pip. As such, we recommending installing any conda packages first. dask-mpi
depends on mpi4py
, although we have found that the pip install command does not automatically install mpi4py
, so we install it explicitly. Also, installation of mpi4py
will link against the system libraries, so the desired MPI library should be loaded first. In addition, it may be necessary to explicitly specify the MPI compiler driver. For example, to install mpi4py on Kestrel using the Intel programming environment and its associated MPI (PrgEnv-intel
), you would do the following:
module load PrgEnv-intel
MPICC=`which mpicc` pip install dask-mpi mpi4py
Dask single node
Dask can be used locally on your laptop or an individual node. Additionally, it provides wrappers for multiprocessing and threadpools. One advantage of using LocalCluster
is that you can easily drop in another cluster configuration to further parallelize, with minimal modification of the code.
The following is a simple example that uses a local cluster with the dask.delayed
interface, which can be used when the problem doesn't fit into one of the built-in collection types such as dask.array
or dask.dataframe
:
Dask local cluster
from distributed import Client, LocalCluster
import dask
import time
import random
@dask.delayed
def inc(x):
time.sleep(random.random())
return x + 1
@dask.delayed
def dec(x):
time.sleep(random.random())
return x - 1
@dask.delayed
def add(x, y):
time.sleep(random.random())
return x + y
def main ():
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
zs = []
for i in range(256):
x = inc(i)
y = dec(x)
z = add(x, y)
zs.append(z)
result = dask.compute(*zs)
print (result)
if __name__ == "__main__":
main()
Dask Jobqueue
The dask-jobqueue
library makes it easy to deploy Dask to a distributed cluster using Slurm (via SLURMCluster). This is particularly useful when running an interactive notebook, where the workers can be scaled dynamically.
For the following example, first make sure that both dask
and dask-jobqueue
have been installed. Create a file named dask_slurm_example.py
with the following contents, and replace <project>
with your project allocation.
Assuming you are on Kestrel, this example will request two jobs from the shared
partition.
dask_slurm_example.py
from dask_jobqueue import SLURMCluster
import socket
from dask.distributed import Client
from collections import Counter
cluster = SLURMCluster(
cores=18,
memory='24GB',
account='<allocation_handle>',
walltime='00:30:00',
processes=17,
queue='shared'
)
client = Client(cluster)
def test():
return socket.gethostname()
result = []
cluster.scale(jobs=2)
for i in range(2000):
result.append(client.submit(test).result())
print(Counter(result))
print(cluster.job_script())
Then the script can simply be executed directly from a login node:
python dask_slurm_example.py
Note that although 2 jobs are requested, Dask launches the jobs dynamically, so depending on the status of the job queue, your results may indicate that only a single node was used.
Dask MPI
Dask also provides a package called dask-mpi
that uses MPI to create the cluster. Note that dask-mpi
only uses MPI to start the cluster, not for inter-node communication.
Dask-MPI provides two interfaces to launch Dask, either from a batch script using the Python API, or from the command line.
Here we show a simple example that uses Dask-MPI with a batch script. Make sure that you have installed dask-mpi
following the Installation Instructions. Create dask_mpi_example.py
and dask_mpi_launcher.sh
with the contents below. In dask_mpi_launcher.sh
, replace <project>
with your allocation, and /path/to/dask-env
with the full conda prefix path into which you installed dask.
dask_mpi_example.py
from dask_mpi import initialize
from dask.distributed import Client
import socket
import time
from collections import Counter
def test():
return socket.gethostname()
def main():
initialize(nthreads=5)
client = Client()
time.sleep(15)
result = []
for i in range (0,100):
result.append(client.submit(test).result())
time.sleep(1)
out = str(Counter(result))
print(f'nodes: {out}')
main()
dask_mpi_launcher.sh
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks=4
#SBATCH --time=10
#SBATCH --account=<project>
ml mamba
conda activate /path/to/dask-env
srun -n 4 python dask_mpi_example.py
The job is then launched as:
sbatch dask_mpi_launcher.sh
Warning
We have observed errors such as distributed.comm.core.CommClosedError
when using dask-mpi
. These errors may be related to known issues such as GitHub Issue #94. Users that experience issues with dask-mpi
are encouraged to use dask-jobqueue
instead.
References
Dask documentation
Dask Jobqueue
Dask MPI