Source code for jade.spark.metrics

"""Generates metrics from a Spark cluster."""

import logging
import requests
from pathlib import Path

from jade.exceptions import ExecutionError
from jade.utils.utils import dump_data


logger = logging.getLogger(__name__)


[docs] class SparkMetrics: """Records metrics from a Spark cluster""" METRICS_FILE = "metrics.json" def __init__(self, manager_node, history=False): self._manager_node = manager_node port = "18080" if history else "4040" self._endpoint = f"http://{manager_node}:{port}/api/v1/applications/" def _submit_request(self, cmd, *args): if not cmd.endswith("/"): cmd += "/" if args: cmd = cmd + "/".join(args) logger.info("Submitting %s", cmd) response = requests.get(cmd) if response.status_code != 200: raise ExecutionError(f"{cmd} failed: status_code={response.status_code}") return response.json()
[docs] def generate_metrics(self, output_dir: Path): """Generate metrics from a Spark cluster into files at the given path. Parameters ---------- cluster: str Cluster master node name output_dir: Path Output directory in which to write files """ output_dir.mkdir(exist_ok=True, parents=True) apps = self.list_applications() results = {"metrics": []} for app in apps: metrics = { "application": app, "executors": self._submit_request(self._endpoint, app["id"], "executors"), "jobs": self._submit_request(self._endpoint, app["id"], "jobs"), } results["metrics"].append(metrics) filename = output_dir / self.METRICS_FILE dump_data(results, filename, indent=2) logger.info("Recorded metrics in %s", filename)
[docs] def list_applications(self): """Return the applications in the cluster. Returns ------- list List of application objects """ return self._submit_request(self._endpoint)