"""
This module contains StructuredLogEvent and EventSummary classes.
"""
from collections import defaultdict
import json
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
import pandas as pd
from prettytable import PrettyTable
from jade.common import JOBS_OUTPUT_DIR, EVENTS_DIR
from jade.exceptions import InvalidConfiguration
from jade.utils.utils import dump_data, load_data
EVENTS_FILENAME = "events.json"
EVENT_CATEGORY_ERROR = "Error"
EVENT_CATEGORY_HPC = "HPC"
EVENT_CATEGORY_RESOURCE_UTIL = "ResourceUtilization"
EVENT_NAME_HPC_SUBMIT = "hpc_submit"
EVENT_NAME_HPC_JOB_ASSIGNED = "hpc_job_assigned"
EVENT_NAME_HPC_JOB_STATE_CHANGE = "hpc_job_state_change"
EVENT_NAME_CPU_STATS = "cpu_stats"
EVENT_NAME_DISK_STATS = "disk_stats"
EVENT_NAME_MEMORY_STATS = "mem_stats"
EVENT_NAME_NETWORK_STATS = "net_stats"
EVENT_NAME_PROCESS_STATS = "process_stats"
EVENT_NAME_BYTES_CONSUMED = "bytes_consumed"
EVENT_NAME_UNHANDLED_ERROR = "unhandled_error"
EVENT_NAME_ERROR_LOG = "log_error"
EVENT_NAME_SUBMIT_STARTED = "submit_started"
EVENT_NAME_SUBMIT_COMPLETED = "submit_completed"
EVENT_NAME_CONFIG_EXEC_SUMMARY = "config_exec_summary"
logger = logging.getLogger(__name__)
[docs]
class StructuredLogEvent:
"""
A class for recording structured log events.
"""
def __init__(self, source, category, name, message, **kwargs):
"""
Initialize the class
Parameters
----------
source: str,
The source of the event.
category: str,
An event category given by the user.
name: str,
An event name given by the user.
message:
An event message given the user.
kwargs:
Other information that the user needs to record into event.
"""
self.source = source
self.category = category
self.name = name
self.message = message
self.event_class = self.__class__.__name__
if "timestamp" in kwargs:
self.timestamp = kwargs.pop("timestamp")
else:
self.timestamp = str(datetime.now())
self.data = kwargs
[docs]
def base_field_names(self):
"""Return the base field names for the event.
Returns
-------
list
"""
return self._base_field_names()
@staticmethod
def _base_field_names():
return ["timestamp", "source", "message"]
[docs]
def field_names(self):
"""Return all field names for the event.
Returns
-------
list
"""
return self.base_field_names() + list(self.data.keys())
[docs]
def values(self):
"""Return the values for all fields in the event.
Returns
-------
list
"""
# Account for events generated with different versions of code.
values = [getattr(self, x, "") for x in self.base_field_names()]
values += [self.data.get(x, "") for x in self.data]
return values
[docs]
@classmethod
def deserialize(cls, record):
"""Deserialize event from JSON.
Parameters
----------
record : dict
Returns
-------
StructuredLogEvent
"""
return cls(
source=record.get("source", ""),
category=record.get("category", ""),
name=record.get("name", ""),
message=record.get("message", ""),
timestamp=record.get("timestamp", ""),
**record["data"],
)
def __str__(self):
"""To format a event instance to string"""
return json.dumps(self.__dict__, sort_keys=True)
[docs]
def to_dict(self):
"""Convert event object to dict"""
return self.__dict__
[docs]
class StructuredErrorLogEvent(StructuredLogEvent):
"""Event specific to exceptions"""
def __init__(self, source, category, name, message, **kwargs):
"""Must be called in an exception context."""
super().__init__(source, category, name, message, **kwargs)
if "exception" not in kwargs:
self._parse_traceback()
[docs]
def base_field_names(self):
return self._base_field_names()
def _parse_traceback(self):
"""
Parse the system exception information - exception, filename, and lineno.
"""
exc_type, exc_obj, tb = sys.exc_info()
assert tb is not None, "must be called in an exception context"
self.data["exception"] = str(exc_type)
self.data["error"] = str(exc_obj).strip()
self.data["filename"] = os.path.basename(tb.tb_frame.f_code.co_filename)
self.data["lineno"] = tb.tb_lineno
[docs]
def deserialize_event(data):
"""Construct an event from raw data.
Parameters
----------
data : dict
Returns
-------
StructuredLogEvent
"""
if data["event_class"] == "StructuredLogEvent":
return StructuredLogEvent.deserialize(data)
if data["event_class"] == "StructuredErrorLogEvent":
return StructuredErrorLogEvent.deserialize(data)
raise Exception(f"unknown event class {data['event_class']}")
[docs]
class EventsSummary:
"""Provides summary of all events."""
RESOURCE_STATS = set(
(
EVENT_NAME_CPU_STATS,
EVENT_NAME_DISK_STATS,
EVENT_NAME_MEMORY_STATS,
EVENT_NAME_NETWORK_STATS,
EVENT_NAME_PROCESS_STATS,
)
)
def __init__(self, output_dir, preload=False, optimize_resoure_stats=True):
"""
Initialize EventsSummary class
Parameters
----------
output_dir: str
Path of jade output directory.
preload: bool
Load all events into memory; otherwise, load by name on demand.
"""
self._optimize_resource_stats = optimize_resoure_stats
self._events = defaultdict(list)
self._output_dir = output_dir
self._event_dir = Path(output_dir) / EVENTS_DIR
self._event_dir.mkdir(exist_ok=True)
self._job_outputs_dir = os.path.join(output_dir, JOBS_OUTPUT_DIR)
event_files = list(self._event_dir.iterdir())
if not event_files:
self._consolidate_events()
self._save_events_summary()
elif preload:
self._load_all_events()
# else, events have already been consolidated, load them on demand
def _iter_event_files(self):
return Path(self._output_dir).glob("*events.log")
def _consolidate_events(self):
"""Find most recent event log files, and merge event data together."""
# PERF: This could easily be parallelized across processes. Need to be sure we aren't on
# a login node.
for event_file in self._iter_event_files():
with open(event_file, "r") as f:
for line in f:
record = json.loads(line)
event = deserialize_event(record)
self._events[event.name].append(event)
for name in self._events.keys():
self._events[name].sort(key=lambda x: x.timestamp)
def _deserialize_events(self, name, path):
self._events[name] = [deserialize_event(x) for x in load_data(path)]
def _get_events(self, name):
if name not in self._events:
self._load_event_file(name)
return self._events.get(name, [])
def _load_all_events(self):
for filename in self._event_dir.iterdir():
name = filename.stem
if name not in self._events and name not in self.RESOURCE_STATS:
self._deserialize_events(name, filename)
def _load_event_file(self, name):
if name in self.RESOURCE_STATS:
raise Exception(f"event {name} is only available in Parquet")
filename = self._make_event_filename(name)
if filename.exists():
self._deserialize_events(name, filename)
def _make_data_filename(self, name):
return self._event_dir / (name + ".parquet")
def _make_event_filename(self, name):
return self._event_dir / (name + ".json")
def _save_events_summary(self):
"""Save events to one file per event name."""
for name, events in self._events.items():
if name in self.RESOURCE_STATS:
dict_events = []
for event in events:
data = {"timestamp": event.timestamp, "source": event.source}
if name == EVENT_NAME_PROCESS_STATS:
for process in event.data["processes"]:
_data = {"timestamp": event.timestamp, "source": event.source}
_data.update(process)
dict_events.append(_data)
else:
data.update(event.data)
dict_events.append(data)
df = pd.DataFrame.from_records(dict_events, index="timestamp")
filename = self._make_data_filename(name)
df.to_parquet(filename)
else:
dict_events = [event.to_dict() for event in events]
filename = self._make_event_filename(name)
dump_data(dict_events, filename)
for name in self.RESOURCE_STATS:
self._events.pop(name, None)
[docs]
def get_bytes_consumed(self):
"""Return a sum of all bytes_consumed events.
Returns
-------
int
Size in bytes of files produced by all jobs
"""
total = 0
for event in self.iter_events(EVENT_NAME_BYTES_CONSUMED):
total += event.data["bytes_consumed"]
return total
[docs]
def get_config_exec_time(self):
"""Return the total number of seconds to run all jobs in the config.
Returns
-------
int
"""
events = self.list_events(EVENT_NAME_CONFIG_EXEC_SUMMARY)
if not events:
raise InvalidConfiguration("no batch summary events found")
return events[0].data["config_execution_time"]
[docs]
def get_dataframe(self, name):
"""Return the dataframe for this event name. Only applicable to resource stats."""
filename = self._make_data_filename(name)
if not filename.exists():
return pd.DataFrame()
return pd.read_parquet(filename)
[docs]
def iter_events(self, name):
"""Return a generator over events with name.
Parameters
----------
name : str
Yields
------
event : StructuredLogEvent
"""
for event in self._get_events(name):
yield event
[docs]
def list_events(self, name):
"""Return the events of type name.
Returns
-------
list
list of StructuredLogEvent
"""
return list(self._get_events(name))
[docs]
def list_unique_categories(self):
"""Return the unique event categories in the log. Will cause all events
to get loaded into memory.
Returns
-------
list
"""
self._load_all_events()
categories = set()
for events in self._events.values():
if not events:
continue
categories.add(events[0].category)
categories = list(categories)
categories.sort()
return categories
[docs]
def list_unique_names(self):
"""Return the unique event names in the log.
Returns
-------
list
"""
return [x.name for x in self._event_dir.iterdir() if x.suffix == ".json"]
[docs]
def show_events(self, name):
"""Print tabular events in terminal"""
table = PrettyTable()
field_names = None
count = 0
for event in self.iter_events(name):
if field_names is None:
field_names = event.field_names()
table.add_row(event.values())
count += 1
if count == 0:
print(f"No events of type {name}")
return
table.field_names = field_names
print(f"Events of type {name} from directory: {self._output_dir}")
print(table)
print(f"Total events: {count}\n")
[docs]
def show_events_in_category(self, category):
"""Print tabular events matching category in terminal. Will cause all
events to get loaded into memory.
"""
event_names = []
self._load_all_events()
for name, events in self._events.items():
if not events:
continue
if events[0].category == category:
event_names.append(name)
if not event_names:
print(f"There are no events in category {category}")
return
for event_name in sorted(event_names):
self.show_events(event_name)
[docs]
def show_event_categories(self):
"""Show the unique event categories in the log."""
print("Catgories: {}".format(" ".join(self.list_unique_categories())))
[docs]
def show_event_names(self):
"""Show the unique event names in the log."""
print("Names: {}".format(" ".join(self.list_unique_names())))
[docs]
def to_json(self):
"""Return all events in JSON format.
Returns
-------
str
"""
self._load_all_events()
return json.dumps(
[x.to_dict() for events in self._events.values() for x in events], indent=2
)