Map a Python function to compute nodes

This tutorial will teach you how to build a workflow from Python functions instead of CLI executables and run on it on an HPC with Slurm.

Workflow Description

Let’s suppose that your code is in a module called simulation.py and looks something like this:

def run(input_params: dict) -> dict:
    """Runs one simulation on a set of input parameters.

    Returns
    -------
    dict
        Result of the simulation.
    """
    job_name = input_params["job_name"]
    return {
        "inputs": input_params,
        "result": 5,
        "output_data_path": f"/projects/my-project/{job_name}",
    }


def postprocess(results: list[dict]) -> dict:
    """Collects the results of the workers and performs postprocessing.

    Parameters
    ----------
    results : list[dict]
        Results from each simulation

    Returns
    -------
    dict
        Final result
    """
    total = 0
    paths = []
    for result in results:
        assert "result" in result
        assert "output_data_path" in result
        total += result["result"]
        paths.append(result["output_data_path"])
    return {"total": total, "output_data_paths": paths}

You need to run this function on hundreds of sets of input parameters and want torc to help you scale this work on an HPC.

The recommended procedure for this task is torc’s Python API as shown below. The goal is to mimic the behavior of Python’s concurrent.futures.ProcessPoolExecutor.map as much as possible.

Similar functionality is also available with Dask.

Resource Constraints

  • Each function call needs 4 CPUs and 20 GiB of memory.

  • The function call takes 1 hour to run.

On Eagle the 92 GiB nodes are easiest to acquire but would only be able to run 4 jobs at a time. The 180 GiB nodes are fewer in number but would use fewer AUs because they would be able to run 8 jobs at a time.

Torc Overview

Here is what torc does to solve this problem:

  • User creates a workflow in Python.

  • User passes a runnable function as well as a list of all input parameters that need to be mapped to the function.

  • For each set of input parameters torc creates an object in the user-data collection in the database, creates a job with a relationship (edge) to that object as an input, and creates a placeholder for data to be created by that job.

  • When torc runs each job it reads the correct input parameters from the database, imports the user’s function, and then calls it with the input parameters.

  • When the function completes, torc stores any returned data in the database.

  • When all workers complete torc collectgs all result data from the database into a list and passes that to the postprocess function. It also stores any returned data from that function into the database.

Build the workflow

  1. Write a script to create the workflow. Note that you need to correct the api URL and the Slurm account.

import getpass

from torc import make_api, map_function_to_jobs
from torc.openapi_client import JobModel, ResourceRequirementsModel, WorkflowModel

api = make_api("http://localhost:8529/_db/workflows/torc-service")
params = [
    {"input1": 1, "input2": 2, "input3": 3},
    {"input1": 4, "input2": 5, "input3": 6},
    {"input1": 7, "input2": 8, "input3": 9},
]
workflow = WorkflowModel(
    user=getpass.getuser(),
    name="my_workflow",
    description="My workflow",
)
rr = api.add_resource_requirements(
    workflow.key,
    ResourceRequirementsModel(name="medium", num_cpus=4, memory="20g", runtime="P0DT1H"),
)
jobs = map_function_to_jobs(
    api,
    workflow.key,
    "simulation",
    "run",
    params,
    resource_requirements=rr.id,
    # Note that this is optional.
    postprocess_func="postprocess",
)
scheduler = api.add_slurm_scheduler(
    workflow.key,
    SlurmSchedulerModel(
        name="short",
        account="my_account",
        mem="180224",
        walltime="04:00:00",
    ),
)
# This is optional, but can be useful to look at actual resource utilization.
config = api.get_workflow_config(workflow.key)
config.compute_node_resource_stats = ComputeNodeResourceStatsModel(
    cpu=True,
    memory=True,
    process=True,
    interval=5,
    monitor_type="periodic",
    make_plots=True,
)
api.modify_workflow_config(workflow.key, config)
print(f"Created workflow with key {workflow.key} {len(jobs)} jobs.")

Note

Refer to Python Client API Reference for complete API documentation.

Requirements:

  • Your run function should raise an exception if there is a failure. If that happens, torc will record a non-zero return code for the job.

  • If you want torc to store result data in the database, return it from your run function. Note: this result data must not be large - the database is not designed for that. If you have large result data, return a pointer (i.e., file path) to its location here.

  • If you choose to define a postprocess function and want torc to store the final data in the database, return it from that function.

  • The params must be serializable in JSON format because they will be stored in the database. Basic types like numbers and strings and lists and dictionaries of those will work fine. If you need to store complex, custom types, consider these options:

    • Define data models with Pydantic. You can use their existing serialization/de-serialization methods or define custom methods.

    • Pickle your data and store the result as a string. Your run function would need to understand how to de-serialize it. Note that this has portability limitations. (Please contact the developers if you would like to see this happen automatically.)

  • Torc must be able to import simulation.py from Python. Here are some options:

    • Put the script in the current directory.

    • Install it in the environment.

    • Specify its parent directory like this:

builder.map_function_to_jobs("simulation", "run", params, module_directory="parent_dir")
  1. Create the workflow.

$ python <your-script>
Created workflow 3141686 with 3 jobs.
  1. Optional: Save the workflow key in the environment to save typing.

$ export TORC_WORKFLOW_KEY=3141686
  1. Optional: save the workflow specification. This illustrates how torc orchestrates this workflow by creating relationships between jobs and the user_data collection. You may also want to edit the input parameters for future runs.

$ torc workflows show
  1. Initialize the workflow.

$ torc workflows start
2023-08-07 11:51:03,891 - INFO [torc.workflow_manager workflow_manager.py:156] : Changed all uninitialized jobs to ready or blocked.
2023-08-05 11:51:03,894 - INFO [torc.workflow_manager workflow_manager.py:82] : Started workflow
$ torc jobs list
+---------------------------------------------------------------------------------------------+
|                                     Jobs in workflow 3141686                                |
+-------+-------------+---------------------------+---------+------------------------+--------+
| index |     name    |          command          |  status | schedule_compute_nodes |  _key  |
+-------+-------------+---------------------------+---------+------------------------+--------+
|   1   |      0      |   torc jobs run-function  |  ready  |            None        | 788309 |
|   2   |      1      |   torc jobs run-function  |  ready  |            None        | 788323 |
|   3   |      2      |   torc jobs run-function  |  ready  |            None        | 788337 |
|   4   | postprocess | torc jobs run-postprocess | blocked |            None        | 788389 |
+-------+-------------+---------------------------+---------+------------------------+--------+
  1. Schedule compute nodes with Slurm. This example only needs one compute node. You will need to make some estimation for your jobs.

    The computes nodes in this example can run eight jobs at a time and can complete four rounds of work (32 jobs per allocation). So, the number of required compute nodes is num_jobs / 32.

$ torc hpc slurm schedule-nodes -n1
  1. The jobs will run whenever Slurm allocates compute nodes. Monitor status as discussed in Check workflow status.

  2. View the result data overall or by job (if your run and postprocess functions return something). Note that listing all user-data will return input parameters.

$ torc list user-data
$ torc jobs list-user-data --stores 788309
$ torc jobs list-user-data --stores 788323
$ torc jobs list-user-data --stores 788337
$ torc jobs list-user-data --stores 788389

Workflow Restarts

If you find that one or more input parameters were incorrect after running the workflow, you can correct them without re-running the entire workflow. Torc stores relationships between the parameters and jobs, and will restart only the affected jobs. Here’s how to do that:

  1. Identify the key(s) for the affected parameters.

$ torc user-data list
[
  {
    "is_ephemeral": false,
    "name": "0",
    "data": {
      "module": "simulation",
      "func": "run",
      "params": {
        "var1": "0",
        "var2": 0
      }
    },
    "key": "3141795",
    "rev": "_gagG-Hy---"
  },
  {
    "is_ephemeral": false,
    "name": "1",
    "data": {
      "module": "simulation",
      "func": "run",
      "params": {
        "var1": "1",
        "var2": 1
      }
    },
    "key": "3141797",
    "rev": "_gagG-H2---"
  },
  {
    "is_ephemeral": false,
    "name": "2",
    "data": {
      "module": "simulation",
      "func": "run",
      "params": {
        "var1": "2",
        "var2": 2
      }
    },
    "key": "3141799",
    "rev": "_gagG-H2--_"
  },
]
  1. Modify the data.

$ torc user-data modify 3141813 -d '{"module":"simulation.py","func":"run","params":{"var1":"100","var2":100}}'

Note

You can get and set user-data through the Python API. Search for get_user_data_key and put_user_data_key at OpenAPI Client DefaultApi.

  1. Restart the workflow.

$ torc workflows restart
  1. Confirm that only one job has a ready status.

$ torc jobs list
  1. Schedule a node to run the job.

$ torc hpc slurm schedule-nodes -n1

Other jobs

You could add “normal” jobs to the workflow as well. For example, you might have preprocessing and postprocessing work to do. You can add those jobs through the API. You could also add multiple rounds of mapped functions. map_function_to_jobs provides a blocked_by parameter to specify ordering. You could also define job-job relationships through files or user-data as discussed elsewhere in this documentation.