Source code for jade.jobs.results_aggregator

"""Synchronizes updates to the results file across jobs."""

import csv
import glob
import logging
import os
import time
from pathlib import Path

from filelock import SoftFileLock, Timeout

from jade.common import RESULTS_DIR
from jade.result import Result, deserialize_result, serialize_result


LOCK_TIMEOUT = 300
PROCESSED_RESULTS_FILENAME = "processed_results.csv"

logger = logging.getLogger(__name__)


[docs] class ResultsAggregator: """Synchronizes updates to the results file. One instance is used to aggregate results from all compute nodes. One instance is used for each compute node. """ def __init__(self, filename, timeout=LOCK_TIMEOUT, delimiter=","): """ Constructs ResultsAggregator. Parameters ---------- filename : Path Results file. timeout : int Lock acquistion timeout in seconds. delimiter : str Delimiter to use for CSV formatting. """ self._filename = filename self._lock_file = self._filename.parent / (self._filename.name + ".lock") self._timeout = timeout self._delimiter = delimiter self._is_node = "batch" in filename.name
[docs] @classmethod def create(cls, output_dir, **kwargs): """Create a new instance. Parameters ---------- output_dir : str Returns ------- ResultsAggregator """ agg = cls(Path(output_dir) / PROCESSED_RESULTS_FILENAME, **kwargs) agg.create_files() return agg
[docs] @classmethod def load(cls, output_dir, **kwargs): """Load an instance from an output directory. Parameters ---------- output_dir : str Returns ------- ResultsAggregator """ return cls(Path(output_dir) / PROCESSED_RESULTS_FILENAME, **kwargs)
[docs] @classmethod def load_node_results(cls, output_dir, batch_id, **kwargs): """Load a per-node instance from an output directory. Parameters ---------- output_dir : str batch_id : int Returns ------- ResultsAggregator """ return cls(Path(output_dir) / RESULTS_DIR / f"results_batch_{batch_id}.csv", **kwargs)
[docs] @classmethod def load_node_results_file(cls, path, **kwargs): """Load a per-node instance from an output directory. Parameters ---------- path : Path Returns ------- ResultsAggregator """ return cls(path, **kwargs)
@staticmethod def _get_fields(): return Result._fields def _do_action_under_lock(self, func, *args, **kwargs): # Using this instead of FileLock because it will be used across nodes # on the Lustre filesystem. lock = SoftFileLock(self._lock_file, timeout=self._timeout) start = time.time() try: lock.acquire(timeout=self._timeout) except Timeout: # Picked a default value such that this should not trip. If it does # trip under normal circumstances then we need to reconsider this. logger.error( "Failed to acquire file lock %s within %s seconds", self._lock_file, self._timeout ) raise duration = time.time() - start if duration > 10: logger.warning("Acquiring ResultsAggregator lock took too long: %s", duration) try: return func(*args, **kwargs) finally: lock.release()
[docs] def create_files(self): """Initialize the results file. Should only be called by the parent process. """ self._do_action_under_lock(self._create_files)
def _create_files(self): with open(self._filename, "w") as f_out: f_out.write(self._delimiter.join(self._get_fields())) f_out.write("\n")
[docs] @classmethod def append(cls, output_dir, result, batch_id=None): """Append a result to the file. output_dir : str result : Result batch_id : int """ if batch_id is None: aggregator = cls.load(output_dir) else: aggregator = cls.load_node_results(output_dir, batch_id) aggregator.append_result(result)
[docs] def append_result(self, result): """Append a result to the file. result : Result """ start = time.time() text = self._delimiter.join([str(getattr(result, x)) for x in self._get_fields()]) self._do_action_under_lock(self._append_result, text) duration = time.time() - start if duration > 10: logger.warning("Appending a result took too long: %s", duration)
def _append_result(self, text): with open(self._filename, "a") as f_out: if f_out.tell() == 0: f_out.write(self._delimiter.join(self._get_fields())) f_out.write("\n") f_out.write(text) f_out.write("\n") def _append_processed_results(self, results): assert not self._is_node with open(self._filename, "a") as f_out: for result in results: text = self._delimiter.join([str(getattr(result, x)) for x in self._get_fields()]) f_out.write(text) f_out.write("\n")
[docs] def clear_results_for_resubmission(self, jobs_to_resubmit): """Remove jobs that will be resubmitted from the results file. Parameters ---------- jobs_to_resubmit : set Job names that will be resubmitted. """ results = [x for x in self.get_results() if x.name not in jobs_to_resubmit] self._write_results(results) logger.info("Cleared %s results from %s", len(results), self._filename)
[docs] def clear_unsuccessful_results(self): """Remove failed and canceled results from the results file.""" results = [x for x in self.get_results() if x.return_code == 0] self._write_results(results) logger.info("Cleared failed results from %s", self._filename)
def _write_results(self, results): _results = [serialize_result(x) for x in results] with open(self._filename, "w") as f_out: writer = csv.DictWriter(f_out, fieldnames=Result._fields) writer.writeheader() if results: writer.writerows(_results)
[docs] def get_results(self): """Return the current results. Returns ------- list list of Result objects """ return self._do_action_under_lock(self._get_all_results)
[docs] def get_results_unsafe(self): """Return the results. It is up to the caller to ensure that a lock is not needed. Returns ------- list list of Result objects """ return self._get_results()
def _get_all_results(self): unprocessed_results = list((self._filename / RESULTS_DIR).glob("results*.csv")) if unprocessed_results: logger.error("Found unprocessed results: %s", unprocessed_results) # TODO: Older code included unprocessed results here. Not sure why. return self._get_results() def _get_results(self): with open(self._filename) as f_in: results = [] reader = csv.DictReader(f_in, delimiter=self._delimiter) for row in reader: row["return_code"] = int(row["return_code"]) row["exec_time_s"] = float(row["exec_time_s"]) row["completion_time"] = float(row["completion_time"]) result = deserialize_result(row) results.append(result) return results
[docs] def move_results(self, func): """Move the results to a new location and delete the file. Parameters ---------- func : function Returns ------- list list of Result """ return self._do_action_under_lock(self._move_results, func)
def _move_results(self, func): results = self._get_results() func(results) os.remove(self._filename) return results
[docs] @classmethod def list_results(cls, output_dir, **kwargs): """Return the current results. Parameters ---------- output_dir : str Returns ------- list list of Result objects """ results = cls.load(output_dir, **kwargs) return results.get_results()
[docs] def process_results(self): """Move all temp results into the consolidated file, then clear the file. Returns ------- list list of Result objects that are newly completed """ assert not self._is_node return self._do_action_under_lock(self._process_results)
def _get_node_results_files(self): assert not self._is_node return list((self._filename.parent / RESULTS_DIR).glob("results_batch_*.csv")) def _process_results(self): results = [] for path in self._get_node_results_files(): agg = ResultsAggregator.load_node_results_file(path) results += agg.move_results(self._append_processed_results) return results