"""Provides the :py:class:`MultiProcess` class for running a model on `NREL's Kestrel HPC system`_.
.. NREL's Kestrel HPC system: https://nrel.github.io/HPC/Documentation/Systems/Kestrel/
"""
from __future__ import annotations
import time
from pathlib import Path
import pandas as pd
from rich.live import Live
from rex.utilities.hpc import SLURM
from dwind.utils import hpc
from dwind.utils.array import split_by_index
[docs]
class MultiProcess:
"""Multiprocessing interface for running batch jobs via ``SLURM``.
Parameters
----------
location : str
The state name, or an underscore-separated string of "state_county"
sector : str
One of "fom" (front of meter) or "btm" (back of the meter).
scenario : str
An underscore-separated string for the scenario to be run.
year : int
The year-basis for the scenario.
env : str | Path
The path to the ``dwind`` Python environment that should be used to run the model.
n_nodes : int
Number of nodes to request from the HPC when running an ``sbatch`` job.
memory : int
Node memory, in GB.
walltime : int
Node walltime request, in hours.
alloc : str
The HPC project (allocation) handle that will be charged for running the analysis.
feature : str
Additional flags for the SLURM job, using formatting such as ``--qos=high`` or
``--depend=[state:job_id]``.
model_config : str
The full file path and name of where the model configuration file is located.
stdout_path : str | Path | None, optional
The path where all the stdout logs should be written to, by default None. When None,
":py:attr:`dir_out` / logs" is used.
dir_out : _type_, optional
The path to save the chunked results files, by default Path.getcwd() (current working
directory).
"""
def __init__(
self,
location: str,
sector: str,
scenario: str,
year: int,
env: str | Path,
n_nodes: int,
memory: int,
walltime: int,
allocation: str,
feature: str,
repository: str | Path,
model_config: str,
stdout_path: str | Path | None = None,
dir_out: str | Path | None = None,
):
"""Initialize the ``SLURM`` interface.
Parameters
----------
location : str
The state name, or an underscore-separated string of "state_county"
sector : str
One of "fom" (front of meter) or "btm" (back of the meter).
scenario : str
An underscore-separated string for the scenario to be run, such as "baseline_2022".
year : int
The year-basis for the scenario.
env : str | Path
The path to the ``dwind`` Python environment that should be used to run the model.
n_nodes : int
Number of nodes to request from the HPC when running an ``sbatch`` job.
memory : int
Node memory, in GB.
walltime : int
Node walltime request, in hours.
allocation : str
The HPC project (allocation) handle that will be charged for running the analysis.
feature : str
Additional flags for the SLURM job, using formatting such as ``--qos=high`` or
``--depend=[state:job_id]``.
repository : str | Path
The path to the dwind repository to use for analysis.
model_config : str
The full file path and name of where the model configuration file is located.
stdout_path : str | Path | None, optional
The path where all the stdout logs should be written to, by default None. When None,
":py:attr:`dir_out` / logs" is used.
dir_out : str | Path, optional
The path to save the chunked results files, by default Path.cwd() (current working
directory).
"""
self.run_name = f"{location}_{sector}_{scenario}_{year}"
self.location = location
self.sector = sector
self.scenario = scenario
self.year = year
self.env = env
self.n_nodes = n_nodes
self.memory = memory
self.walltime = walltime
self.alloc = allocation
self.feature = feature
self.stdout_path = stdout_path
self.dir_out = dir_out
self.repository = repository
self.model_config = model_config
# Create the output directory if it doesn't already exist
self.dir_out = Path.cwd() if dir_out is None else Path(self.dir_out).resolve()
self.out_path = self.dir_out / "chunk_files"
if not self.out_path.exists():
self.out_path.mkdir()
# Create a new path in the output directory for the logs if a path is not provided
if self.stdout_path is None:
log_dir = self.out_path / "logs"
if not log_dir.exists():
log_dir.mkdir()
self.stdout_path = log_dir
[docs]
def check_status(self, job_ids: list[int], start_time: float):
"""Prints the status of all :py:attr:`jobs` submitted.
Parameters
----------
job_ids : list[int]
The list of HPC ``job_id``s to check on.
start_time : float
The results of initial ``time.perf_counter()``.
"""
slurm = SLURM()
job_status = {
j: {
"status": slurm.check_status(job_id=j),
"start": start_time,
"wait": time.perf_counter() - start_time,
"run": 0,
}
for j in job_ids
}
table, complete = hpc.generate_run_status_table(job_status)
with Live(table, refresh_per_second=1) as live:
while not complete:
time.sleep(5)
job_status |= hpc.update_status(job_status)
table, complete = hpc.generate_run_status_table(job_status)
live.update(table)
[docs]
def run_jobs(self, agent_df: pd.DataFrame) -> dict[str, int]:
"""Run :py:attr:`n_jobs` number of jobs for the :py:attr:`agent_df`.
Args:
agent_df (pandas.DataFrame): The agent DataFrame to be chunked and analyzed.
Returns:
dict[str, int]: Dictionary mapping of each SLURM job id to the chunk run in that job.
"""
agent_df = agent_df.reset_index(drop=True)
# chunks = np.array_split(agent_df, self.n_nodes)
starts, ends = split_by_index(agent_df, self.n_nodes)
job_chunk_map = {}
base_cmd_str = f"module load conda; conda activate {self.env};"
base_cmd_str += " dwind run chunk"
base_args = f" {self.location}"
base_args += f" {self.sector}"
base_args += f" {self.scenario}"
base_args += f" {self.year}"
base_args += f" {self.out_path}"
base_args += f" {self.repository}"
base_args += f" {self.model_config}"
if not (agent_path := self.out_path / "agent_chunks").is_dir():
agent_path.mkdir()
start_time = time.perf_counter()
# for i, (start, end) in enumerate(zip(starts, ends, strict=True)):
for i, (start, end) in enumerate(zip(starts, ends)): # noqa: B905
fn = self.out_path / "agent_chunks" / f"agents_{i}.pqt"
agent_df.iloc[start:end].to_parquet(fn)
job_name = f"{self.run_name}_{i}"
cmd_str = f"{base_cmd_str} {i} {base_args}"
print("cmd:", cmd_str)
slurm_manager = SLURM()
job_id, err = slurm_manager.sbatch(
cmd=cmd_str,
alloc=self.alloc,
memory=self.memory,
walltime=self.walltime,
feature=self.feature,
name=job_name,
stdout_path=self.stdout_path,
)
if job_id:
job_chunk_map[job_id] = i
print(f"Kicked off job: {job_name}, with SLURM {job_id=} on Eagle.")
else:
print(
f"{job_name=} was unable to be kicked off due to the following error:\n{err}."
)
# Check on the job statuses until they're complete, then aggregate the results
jobs = [*job_chunk_map]
self.check_status(jobs, start_time)
return job_chunk_map