gaps.legacy.Status#

class Status(status_dir)[source]#

Bases: Status

Base class for data pipeline health and status information.

Initialize Status.

Parameters:

status_dir (path-like) – Directory containing zero or more job json status files.

Methods

add_job(status_dir, module, job_name[, ...])

Add a job to status json.

as_df([pipe_steps, index_name, include_cols])

Format status as pandas DataFrame.

clear()

copy()

dump()

Dump status json w/ backup file in case process gets killed.

fromkeys(iterable[, value])

get(k[,d])

items()

job_exists(status_dir, job_name[, pipeline_step])

Check whether a job exists and return a bool.

keys()

make_job_file(status_dir, module, job_name, ...)

Make a json file recording the status of a single job.

make_single_job_file(status_dir, ...)

Make a json file recording the status of a single job.

mark_job_as_submitted(status_dir, ...[, ...])

Mark a job in the status json as "submitted".

parse_step_status(status_dir, pipeline_step)

Parse key from job status(es) from the given pipeline step.

pop(k[,d])

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem()

as a 2-tuple; but raise KeyError if D is empty.

record_monitor_pid(status_dir, pid)

Make a json file recording the PID of the monitor process.

reload()

Re-load the data from disk.

reset_after(pipeline_step)

Reset status of all pipeline steps after the input one.

retrieve_job_status(status_dir, module, job_name)

Update and retrieve job status.

setdefault(k[,d])

step_index(pipeline_step)

Get pipeline index for the pipeline step, if it exists.

update([E, ]**F)

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

update_from_all_job_files([check_hardware, ...])

Update status from all single-job job status files.

update_job_status(pipeline_step, job_name[, ...])

Update single-job job status from single-job job status file.

values()

Attributes

HIDDEN_SUB_DIR

JOB_STATUS_FILE

MONITOR_PID_FILE

NAMED_STATUS_FILE

job_hardware

Flat list of job hardware options.

job_ids

Flat list of job ids.

classmethod retrieve_job_status(status_dir, module, job_name, hardware='slurm', subprocess_manager=None)[source]#

Update and retrieve job status.

Parameters:
  • status_dir (str) – Directory containing json status file.

  • module (str) – Module that the job belongs to.

  • job_name (str) – Unique job name identification.

  • hardware (str) – Name of hardware that this pipeline is being run on: eagle, slurm, local. Defaults to “slurm”. This specifies how job are queried for status.

  • subprocess_manager (None | SLURM) – Optional initialized subprocess manager to use to check job statuses. This can be input with cached queue data to avoid constantly querying the HPC.

Returns:

status (str | None) – Status string or None if job/module not found.

classmethod add_job(status_dir, module, job_name, replace=False, job_attrs=None)[source]#

Add a job to status json.

Parameters:
  • status_dir (str) – Directory containing json status file.

  • module (str) – Module that the job belongs to.

  • job_name (str) – Unique job name identification.

  • replace (bool) – Flag to force replacement of pre-existing job status.

  • job_attrs (dict) – Job attributes. Should include ‘job_id’ if running on HPC.

static make_job_file(status_dir, module, job_name, attrs)[source]#

Make a json file recording the status of a single job.

Parameters:
  • status_dir (str) – Directory to put json status file.

  • module (str) – Module that the job belongs to.

  • job_name (str) – Unique job name identification.

  • attrs (str) – Dictionary of job attributes that represent the job status attributes.

as_df(pipe_steps=None, index_name='job_name', include_cols=None)[source]#

Format status as pandas DataFrame.

Parameters:
  • pipe_steps (container, optional) – A container of pipeline step names to collect. If None, all pipeline steps in the status file are collected. By default, None.

  • index_name (str, optional) – Name to assign to index of DataFrame. By default, “job_name”.

Returns:

pd.DataFrame – Pandas DataFrame containing status information.

clear() None.  Remove all items from D.#
dump()[source]#

Dump status json w/ backup file in case process gets killed.

get(k[, d]) D[k] if k in D, else d.  d defaults to None.#
items() a set-like object providing a view on D's items#
classmethod job_exists(status_dir, job_name, pipeline_step=None)[source]#

Check whether a job exists and return a bool.

This method will return True if the job name is found as a key in the dictionary under the pipeline_step keyword, or any pipeline step if a None value is passed.

Parameters:
  • status_dir (str) – Directory containing json status file.

  • job_name (str) – Unique job name identification.

  • pipeline_step (str, optional) – Pipeline step that the job belongs to. By default, None, which checks all pipeline steps for the job name.

Returns:

exists (bool) – True if the job exists in the status json.

property job_hardware#

Flat list of job hardware options.

Type:

list

property job_ids#

Flat list of job ids.

Type:

list

keys() a set-like object providing a view on D's keys#
classmethod make_single_job_file(status_dir, pipeline_step, job_name, attrs)[source]#

Make a json file recording the status of a single job.

This method should primarily be used by HPC nodes to mark the status of individual jobs.

Parameters:
  • status_dir (path-like) – Directory to put json status file.

  • pipeline_step (str) – Pipeline step that the job belongs to.

  • job_name (str) – Unique job name identification.

  • attrs (dict) – Dictionary of job attributes that represent the job status attributes.

classmethod mark_job_as_submitted(status_dir, pipeline_step, job_name, replace=False, job_attrs=None)[source]#

Mark a job in the status json as “submitted”.

Status json does not have to exist - it is created if missing. If it exists, the job will only be updated if it does not exist (i.e. not submitted), unless replace is set to True.

Parameters:
  • status_dir (path-like) – Directory containing json status file.

  • pipeline_step (str) – Pipeline step that the job belongs to.

  • job_name (str) – Unique job name identification.

  • replace (bool, optional) – Flag to force replacement of pre-existing job status. By default, False.

  • job_attrs (dict, optional) – Job attributes. Should include ‘job_id’ if running on HPC. By default, None.

classmethod parse_step_status(status_dir, pipeline_step, key=StatusField.OUT_FILE)[source]#

Parse key from job status(es) from the given pipeline step.

This command DOES NOT check the HPC queue for jobs and therefore DOES NOT update the status of previously running jobs that have errored out of the HPC queue.

Parameters:
  • status_dir (path-like) – Directory containing the status file to parse.

  • pipeline_step (str) – Target pipeline step to parse.

  • key (StatusField | str, optional) – Parsing target of previous pipeline step. By default, StatusField.OUT_FILE.

Returns:

list – Arguments parsed from the status file in status_dir from the input pipeline step. This list is empty if the key is not found in the job status, or if the pipeline step does not exist in status.

pop(k[, d]) v, remove specified key and return the corresponding value.#

If key is not found, d is returned if given, otherwise KeyError is raised.

popitem() (k, v), remove and return some (key, value) pair#

as a 2-tuple; but raise KeyError if D is empty.

classmethod record_monitor_pid(status_dir, pid)[source]#

Make a json file recording the PID of the monitor process.

Parameters:
  • status_dir (path-like) – Directory to put json status file.

  • pid (int) – PID of the monitoring process.

reload()[source]#

Re-load the data from disk.

reset_after(pipeline_step)[source]#

Reset status of all pipeline steps after the input one.

Parameters:

pipeline_step (str) – Pipeline step to delineate which parts of the status should be reset. If this pipeline step is not found in the status, nothing is reset. The status for the pipeline step is untouched; only the status of steps following this one are reset.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D#
step_index(pipeline_step)[source]#

Get pipeline index for the pipeline step, if it exists.

Parameters:

pipeline_step (str) – Name of pipeline step.

Returns:

int | None – Pipeline index of pipeline step if it is found in the status, None otherwise.

update([E, ]**F) None.  Update D from mapping/iterable E and F.#

If E present and has a .keys() method, does: for k in E: D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

update_from_all_job_files(check_hardware=False, purge=True)[source]#

Update status from all single-job job status files.

This method loads all single-job status files in the target directory and updates the Status object with the single-job statuses.

Parameters:
  • check_hardware (bool, optional) – Option to check hardware status for job failures for jobs with a “running” status. This is useful because the “running” status may not be correctly updated if the job terminates abnormally on the HPC. By default, False.

  • purge (bool, optional) – Option to purge the individual status files. By default, True.

Returns:

Status – This instance of Status with updated job properties.

update_job_status(pipeline_step, job_name, hardware_status_retriever=None)[source]#

Update single-job job status from single-job job status file.

If the status for a given pipeline step/job name combination is not found, the status object remains unchanged.

Parameters:
  • pipeline_step (str) – Pipeline step that the job belongs to.

  • job_name (str) – Unique job name identification.

  • hardware_status_retriever (HardwareStatusRetriever, optional) – Hardware status retriever. By default, None, which creates an instance internally.

values() an object providing a view on D's values#