Source code for jade.events

"""
This module contains StructuredLogEvent and EventSummary classes.
"""

from collections import defaultdict
import json
import logging
import os
import sys
from datetime import datetime
from pathlib import Path

import pandas as pd
from prettytable import PrettyTable

from jade.common import JOBS_OUTPUT_DIR, EVENTS_DIR
from jade.exceptions import InvalidConfiguration
from jade.utils.utils import dump_data, load_data


EVENTS_FILENAME = "events.json"

EVENT_CATEGORY_ERROR = "Error"
EVENT_CATEGORY_HPC = "HPC"
EVENT_CATEGORY_RESOURCE_UTIL = "ResourceUtilization"

EVENT_NAME_HPC_SUBMIT = "hpc_submit"
EVENT_NAME_HPC_JOB_ASSIGNED = "hpc_job_assigned"
EVENT_NAME_HPC_JOB_STATE_CHANGE = "hpc_job_state_change"
EVENT_NAME_CPU_STATS = "cpu_stats"
EVENT_NAME_DISK_STATS = "disk_stats"
EVENT_NAME_MEMORY_STATS = "mem_stats"
EVENT_NAME_NETWORK_STATS = "net_stats"
EVENT_NAME_PROCESS_STATS = "process_stats"
EVENT_NAME_BYTES_CONSUMED = "bytes_consumed"
EVENT_NAME_UNHANDLED_ERROR = "unhandled_error"
EVENT_NAME_ERROR_LOG = "log_error"
EVENT_NAME_SUBMIT_STARTED = "submit_started"
EVENT_NAME_SUBMIT_COMPLETED = "submit_completed"
EVENT_NAME_CONFIG_EXEC_SUMMARY = "config_exec_summary"

logger = logging.getLogger(__name__)


[docs] class StructuredLogEvent: """ A class for recording structured log events. """ def __init__(self, source, category, name, message, **kwargs): """ Initialize the class Parameters ---------- source: str, The source of the event. category: str, An event category given by the user. name: str, An event name given by the user. message: An event message given the user. kwargs: Other information that the user needs to record into event. """ self.source = source self.category = category self.name = name self.message = message self.event_class = self.__class__.__name__ if "timestamp" in kwargs: self.timestamp = kwargs.pop("timestamp") else: self.timestamp = str(datetime.now()) self.data = kwargs
[docs] def base_field_names(self): """Return the base field names for the event. Returns ------- list """ return self._base_field_names()
@staticmethod def _base_field_names(): return ["timestamp", "source", "message"]
[docs] def field_names(self): """Return all field names for the event. Returns ------- list """ return self.base_field_names() + list(self.data.keys())
[docs] def values(self): """Return the values for all fields in the event. Returns ------- list """ # Account for events generated with different versions of code. values = [getattr(self, x, "") for x in self.base_field_names()] values += [self.data.get(x, "") for x in self.data] return values
[docs] @classmethod def deserialize(cls, record): """Deserialize event from JSON. Parameters ---------- record : dict Returns ------- StructuredLogEvent """ return cls( source=record.get("source", ""), category=record.get("category", ""), name=record.get("name", ""), message=record.get("message", ""), timestamp=record.get("timestamp", ""), **record["data"], )
def __str__(self): """To format a event instance to string""" return json.dumps(self.__dict__, sort_keys=True)
[docs] def to_dict(self): """Convert event object to dict""" return self.__dict__
[docs] class StructuredErrorLogEvent(StructuredLogEvent): """Event specific to exceptions""" def __init__(self, source, category, name, message, **kwargs): """Must be called in an exception context.""" super().__init__(source, category, name, message, **kwargs) if "exception" not in kwargs: self._parse_traceback()
[docs] def base_field_names(self): return self._base_field_names()
def _parse_traceback(self): """ Parse the system exception information - exception, filename, and lineno. """ exc_type, exc_obj, tb = sys.exc_info() assert tb is not None, "must be called in an exception context" self.data["exception"] = str(exc_type) self.data["error"] = str(exc_obj).strip() self.data["filename"] = os.path.basename(tb.tb_frame.f_code.co_filename) self.data["lineno"] = tb.tb_lineno
[docs] def deserialize_event(data): """Construct an event from raw data. Parameters ---------- data : dict Returns ------- StructuredLogEvent """ if data["event_class"] == "StructuredLogEvent": return StructuredLogEvent.deserialize(data) if data["event_class"] == "StructuredErrorLogEvent": return StructuredErrorLogEvent.deserialize(data) raise Exception(f"unknown event class {data['event_class']}")
[docs] class EventsSummary: """Provides summary of all events.""" RESOURCE_STATS = set( ( EVENT_NAME_CPU_STATS, EVENT_NAME_DISK_STATS, EVENT_NAME_MEMORY_STATS, EVENT_NAME_NETWORK_STATS, EVENT_NAME_PROCESS_STATS, ) ) def __init__(self, output_dir, preload=False, optimize_resoure_stats=True): """ Initialize EventsSummary class Parameters ---------- output_dir: str Path of jade output directory. preload: bool Load all events into memory; otherwise, load by name on demand. """ self._optimize_resource_stats = optimize_resoure_stats self._events = defaultdict(list) self._output_dir = output_dir self._event_dir = Path(output_dir) / EVENTS_DIR self._event_dir.mkdir(exist_ok=True) self._job_outputs_dir = os.path.join(output_dir, JOBS_OUTPUT_DIR) event_files = list(self._event_dir.iterdir()) if not event_files: self._consolidate_events() self._save_events_summary() elif preload: self._load_all_events() # else, events have already been consolidated, load them on demand def _iter_event_files(self): return Path(self._output_dir).glob("*events.log") def _consolidate_events(self): """Find most recent event log files, and merge event data together.""" # PERF: This could easily be parallelized across processes. Need to be sure we aren't on # a login node. for event_file in self._iter_event_files(): with open(event_file, "r") as f: for line in f: record = json.loads(line) event = deserialize_event(record) self._events[event.name].append(event) for name in self._events.keys(): self._events[name].sort(key=lambda x: x.timestamp) def _deserialize_events(self, name, path): self._events[name] = [deserialize_event(x) for x in load_data(path)] def _get_events(self, name): if name not in self._events: self._load_event_file(name) return self._events.get(name, []) def _load_all_events(self): for filename in self._event_dir.iterdir(): name = filename.stem if name not in self._events and name not in self.RESOURCE_STATS: self._deserialize_events(name, filename) def _load_event_file(self, name): if name in self.RESOURCE_STATS: raise Exception(f"event {name} is only available in Parquet") filename = self._make_event_filename(name) if filename.exists(): self._deserialize_events(name, filename) def _make_data_filename(self, name): return self._event_dir / (name + ".parquet") def _make_event_filename(self, name): return self._event_dir / (name + ".json") def _save_events_summary(self): """Save events to one file per event name.""" for name, events in self._events.items(): if name in self.RESOURCE_STATS: dict_events = [] for event in events: data = {"timestamp": event.timestamp, "source": event.source} if name == EVENT_NAME_PROCESS_STATS: for process in event.data["processes"]: _data = {"timestamp": event.timestamp, "source": event.source} _data.update(process) dict_events.append(_data) else: data.update(event.data) dict_events.append(data) df = pd.DataFrame.from_records(dict_events, index="timestamp") filename = self._make_data_filename(name) df.to_parquet(filename) else: dict_events = [event.to_dict() for event in events] filename = self._make_event_filename(name) dump_data(dict_events, filename) for name in self.RESOURCE_STATS: self._events.pop(name, None)
[docs] def get_bytes_consumed(self): """Return a sum of all bytes_consumed events. Returns ------- int Size in bytes of files produced by all jobs """ total = 0 for event in self.iter_events(EVENT_NAME_BYTES_CONSUMED): total += event.data["bytes_consumed"] return total
[docs] def get_config_exec_time(self): """Return the total number of seconds to run all jobs in the config. Returns ------- int """ events = self.list_events(EVENT_NAME_CONFIG_EXEC_SUMMARY) if not events: raise InvalidConfiguration("no batch summary events found") return events[0].data["config_execution_time"]
[docs] def get_dataframe(self, name): """Return the dataframe for this event name. Only applicable to resource stats.""" filename = self._make_data_filename(name) if not filename.exists(): return pd.DataFrame() return pd.read_parquet(filename)
[docs] def iter_events(self, name): """Return a generator over events with name. Parameters ---------- name : str Yields ------ event : StructuredLogEvent """ for event in self._get_events(name): yield event
[docs] def list_events(self, name): """Return the events of type name. Returns ------- list list of StructuredLogEvent """ return list(self._get_events(name))
[docs] def list_unique_categories(self): """Return the unique event categories in the log. Will cause all events to get loaded into memory. Returns ------- list """ self._load_all_events() categories = set() for events in self._events.values(): if not events: continue categories.add(events[0].category) categories = list(categories) categories.sort() return categories
[docs] def list_unique_names(self): """Return the unique event names in the log. Returns ------- list """ return [x.name for x in self._event_dir.iterdir() if x.suffix == ".json"]
[docs] def show_events(self, name): """Print tabular events in terminal""" table = PrettyTable() field_names = None count = 0 for event in self.iter_events(name): if field_names is None: field_names = event.field_names() table.add_row(event.values()) count += 1 if count == 0: print(f"No events of type {name}") return table.field_names = field_names print(f"Events of type {name} from directory: {self._output_dir}") print(table) print(f"Total events: {count}\n")
[docs] def show_events_in_category(self, category): """Print tabular events matching category in terminal. Will cause all events to get loaded into memory. """ event_names = [] self._load_all_events() for name, events in self._events.items(): if not events: continue if events[0].category == category: event_names.append(name) if not event_names: print(f"There are no events in category {category}") return for event_name in sorted(event_names): self.show_events(event_name)
[docs] def show_event_categories(self): """Show the unique event categories in the log.""" print("Catgories: {}".format(" ".join(self.list_unique_categories())))
[docs] def show_event_names(self): """Show the unique event names in the log.""" print("Names: {}".format(" ".join(self.list_unique_names())))
[docs] def to_json(self): """Return all events in JSON format. Returns ------- str """ self._load_all_events() return json.dumps( [x.to_dict() for events in self._events.values() for x in events], indent=2 )