Source code for nrel.routee.powertrain.core.model

from __future__ import annotations

from dataclasses import dataclass
import json
from math import isinf
from pathlib import Path
from typing import Dict, List, Optional, TYPE_CHECKING, Union
from urllib import request

import pandas as pd
from nrel.routee.powertrain.core.features import (
    FeatureSetId,
    feature_id_to_names,
    feature_names_to_id,
)

from nrel.routee.powertrain.core.metadata import Metadata
from nrel.routee.powertrain.core.real_world_adjustments import ADJUSTMENT_FACTORS
from nrel.routee.powertrain.estimators.estimator_interface import Estimator
from nrel.routee.powertrain.estimators.onnx import ONNXEstimator
from nrel.routee.powertrain.estimators.smart_core import SmartCoreEstimator
from nrel.routee.powertrain.estimators.ngboost_estimator import NGBoostEstimator

from nrel.routee.powertrain.validation.feature_visualization import (
    contour_plot,
    visualize_features,
)
from nrel.routee.powertrain.validation.errors import ModelErrors

if TYPE_CHECKING:
    from pandas import Series

REGISTERED_ESTIMATORS = {
    "ONNXEstimator": ONNXEstimator,
    "SmartCoreEstimator": SmartCoreEstimator,
    "NGBoostEstimator": NGBoostEstimator,
}

METADATA_SERIALIZATION_KEY = "metadata"
MODEL_ERRORS_SERIALIZATION_KEY = "errors"
ALL_ESTIMATOR_SERIALIZATION_KEY = "all_estimators"
ESTIMATOR_SERIALIZATION_KEY = "estimator"
CONSTRUCTOR_TYPE_SERIALIZATION_KEY = "estimator_constructor_type"


[docs] @dataclass class Model: """ A RouteE-Powertrain vehicle model represents a single vehicle (i.e. a 2016 Toyota Camry with a 1.5 L gasoline engine). """ estimators: Dict[FeatureSetId, Estimator] metadata: Metadata errors: ModelErrors @property def feature_sets(self): return self.metadata.config.feature_sets @property def feature_set_lists(self) -> List[List[str]]: return [feature_id_to_names(fid) for fid in self.estimators.keys()]
[docs] @classmethod def from_dict(cls, input_dict: dict) -> Model: """ Load a vehicle model from a python dictionary """ metadata_dict = input_dict.get(METADATA_SERIALIZATION_KEY) if metadata_dict is None: raise ValueError( "Model file must contain metadata at key: " f"'{METADATA_SERIALIZATION_KEY}'" ) metadata = Metadata.from_dict(metadata_dict) model_errors_dict = input_dict.get(MODEL_ERRORS_SERIALIZATION_KEY) if model_errors_dict is None: raise ValueError( "Model file must contain model errors at key: " f"'{MODEL_ERRORS_SERIALIZATION_KEY}'" ) model_errors = ModelErrors.from_dict(model_errors_dict) all_estimators_dict = input_dict.get(ALL_ESTIMATOR_SERIALIZATION_KEY) if all_estimators_dict is None: raise ValueError( "Model file must contain estimator data at key: " f"'{ALL_ESTIMATOR_SERIALIZATION_KEY}'" ) estimator_constructor_type = input_dict.get("estimator_constructor_type") estimators = {} for feature_set_id, ed in all_estimators_dict.items(): constructor_type = ed.get(CONSTRUCTOR_TYPE_SERIALIZATION_KEY) if estimator_constructor_type is None: raise ValueError( "Model file must contain estimator constructor at key: " f"'{CONSTRUCTOR_TYPE_SERIALIZATION_KEY}'" ) estimator_constructor = REGISTERED_ESTIMATORS.get(constructor_type) if estimator_constructor is None: raise ValueError( f"Estimator constructor type '{estimator_constructor_type}' " "is not registered" ) estimator_input_dict = ed.get(ESTIMATOR_SERIALIZATION_KEY) if estimator_input_dict is None: raise ValueError( "Model file must contain estimator data at key: " f"'{ESTIMATOR_SERIALIZATION_KEY}'" ) estimator = estimator_constructor.from_dict(estimator_input_dict) estimators[feature_set_id] = estimator return cls(estimators, metadata, model_errors)
[docs] def to_dict(self) -> dict: """ Convert model to a dictionary """ estimator_dict = {} for feature_set_id, estimator in self.estimators.items(): estimator_dict[feature_set_id] = { ESTIMATOR_SERIALIZATION_KEY: estimator.to_dict(), CONSTRUCTOR_TYPE_SERIALIZATION_KEY: estimator.__class__.__name__, } return { METADATA_SERIALIZATION_KEY: self.metadata.to_dict(), MODEL_ERRORS_SERIALIZATION_KEY: self.errors.to_dict(), ALL_ESTIMATOR_SERIALIZATION_KEY: estimator_dict, CONSTRUCTOR_TYPE_SERIALIZATION_KEY: self.estimators.__class__.__name__, }
[docs] @classmethod def from_file(cls, file: Union[str, Path]): """ Load a vehicle model from a file. Args: file: the path to the file to load Returns: a powertrain vehicle """ path = Path(file) if path.suffix != ".json": raise ValueError("Model file must be a .json file") with path.open("r") as f: input_dict = json.load(f) return cls.from_dict(input_dict)
[docs] @classmethod def from_url(cls, url: str) -> Model: """ Attempts to read a file from a url. Args: url: the url to download the file from Returns: a powertrain vehicle """ with request.urlopen(url) as u: in_dict = json.load(u) vehicle = cls.from_dict(in_dict) return vehicle
[docs] def to_file(self, file: Union[str, Path]): """ Save a vehicle model to a file. Args: file: the path to the file to save to """ path = Path(file) if path.suffix != ".json": raise ValueError("Model file must be a .json file") output_dict = self.to_dict() with path.open("w") as f: json.dump(output_dict, f)
[docs] def visualize_features( self, estimator_id: FeatureSetId, n_samples: Optional[int] = 100, output_path: Optional[str] = None, return_predictions: Optional[bool] = False, ) -> Optional[Dict[str, "Series"]]: """ generates test links to independently test the model's features and creates plots of those predictions for the given estimator id Args: estimator_id: the estimator id for generating the plots n_samples: the number of samples used to generate the plots output_path: an optional path to save the plots as png files. return_predictions: if true, returns the dictionary containing the prediction values Returns: optionally returns a dictionary containing the predictions where the key is the feature tested """ feature_set = self.metadata.config.get_feature_set( feature_id_to_names(estimator_id) ) if feature_set is None: raise KeyError( f"Model does not have a feature set with the features: {feature_id_to_names(estimator_id)}" ) feature_ranges = {} for f in feature_set.features: if isinf(f.constraints.upper) or isinf(f.constraints.lower): raise ValueError( f"Feature: {f.name} has constraints with positive/negative infinity in the lower/upper bound. " f"You can add constraints when training a model or set custom constraints during visualization using " f"nrel.routee.powertrain.validation.feature_visualization.visualize_features" ) feature_ranges[f.name] = { "upper": f.constraints.upper, "lower": f.constraints.lower, "n_samples": n_samples, } return visualize_features( model=self, feature_ranges=feature_ranges, output_path=output_path, return_predictions=return_predictions, )
[docs] def contour( self, estimator_id: FeatureSetId, x_feature: str, y_feature: str, n_samples: Optional[int] = 100, output_path: Optional[str] = None, ): """ generates a contour plot of the two test features: x_feature and y_feature. for the given estimator id Args: estimator_id: the estimator id for generating the plots x_feature: one of the features used to generate the energy matrix and will be the x-axis feature y_feature: one of the features used to generate the energy matrix and will be the y-axis feature n_samples: the number of samples used to generate the plots output_path: an optional path to save the plots as png files. """ feature_set = self.metadata.config.get_feature_set( feature_id_to_names(estimator_id) ) if feature_set is None: raise KeyError( f"Model does not have a feature set with the features: {feature_id_to_names(estimator_id)}" ) feature_ranges = {} for f in feature_set.features: if isinf(f.constraints.upper) or isinf(f.constraints.lower): raise ValueError( f"Feature: {f.name} has constraints with positive/negative infinity in the lower/upper bound. " f"You can add constraints when training a model or set custom constraints during visualization using " f"nrel.routee.powertrain.validation.feature_visualization.contour_plot" ) feature_ranges[f.name] = { "upper": f.constraints.upper, "lower": f.constraints.lower, "n_samples": n_samples, } return contour_plot( model=self, x_feature=x_feature, y_feature=y_feature, feature_ranges=feature_ranges, output_path=output_path, )
[docs] def predict( self, links_df: pd.DataFrame, feature_columns: Optional[List[str]] = None, distance_column: Optional[str] = None, apply_real_world_adjustment: bool = True, ) -> pd.DataFrame: """ Predict absolute energy consumption for each link Args: links_df: a dataframe containing the links to predict on feature_columns: the features to use for prediction distance_column: the column to use for distance apply_real_world_adjustment: whether to apply a real world adjustment to the predicted energy consumption Returns: a dataframe containing the predicted energy consumption for each link """ config = self.metadata.config if distance_column is None: distance_column = config.distance.name if distance_column not in links_df.columns: raise ValueError( f"links_df must contain a distance column named: '{distance_column}'" ) else: links_df = links_df.rename(columns={distance_column: config.distance.name}) # if we only have one estimator, just use that if len(self.estimators) == 1: feature_set_id = list(self.estimators.keys())[0] estimator = self.estimators.get(feature_set_id) if estimator is None: raise ValueError("Could not find estimator") # if no explicit feature names are supplied we assume that the # dataframe contains all the features needed for prediction; # if that isn't the case, we throw an error elif feature_columns is None: feature_columns = [c for c in links_df.columns if c != distance_column] feature_set_id = feature_names_to_id(feature_columns) estimator = self.estimators.get(feature_set_id) if estimator is None: raise ValueError( "This model has multiple feature sets and no features were " "explicitly provided. " "We attempted to just use the columns in the incoming dataframe " "but we couldn't find an estiamtor that matches the features: " f"{feature_columns}. " "Please provide an explicit list of feature names to the features " "paramter of the predict method or provide a dataframe that only " "contains the features you want to use. " "Here are the feature sets that can be used: " f"{self.feature_set_lists}" ) else: feature_set_id = feature_names_to_id(feature_columns) estimator = self.estimators.get(feature_set_id) if estimator is None: raise ValueError( "Could not find an estimator that matches the provided " f"feature columns {feature_columns}. Here are the feature " f"sets that can be used: {self.feature_set_lists}" ) feature_set = self.metadata.config.feature_set_map.get(feature_set_id) if feature_set is None: raise ValueError( f"Could not find a feature set {feature_set_id} in model config" ) pred_energy_df = estimator.predict( links_df, feature_set, self.metadata.config.distance, self.metadata.config.target, self.metadata.config.predict_method, ) for energy in config.target.targets: if apply_real_world_adjustment: adjustment_factor = ADJUSTMENT_FACTORS.get(config.powertrain_type) if adjustment_factor is None: raise ValueError( f"Could not find an adjustment factor for powertrain type " f"{config.powertrain_type}" ) pred_energy_df[energy.name] = ( pred_energy_df[energy.name] * adjustment_factor ) return pred_energy_df
def __repr__(self) -> str: """ Shows a nice pretty printed summary of the model including: - Model average fuel consumption - Select set of errors - Expected features and their units - Powertrain specifications """ config = self.metadata.config summary_lines = [] summary_lines.append("=" * 40) summary_lines.append("Model Summary") summary_lines.append("-" * 20) summary_lines.append(f"Vehicle description: {config.vehicle_description}") summary_lines.append(f"Powertrain type: {config.powertrain_type.name}") summary_lines.append(f"Number of estimators: {len(self.estimators)}") summary_lines.append("=" * 40) for feature_set_id in self.estimators.keys(): estimator_errors = self.errors.estimator_errors.get(feature_set_id) if estimator_errors is None: raise ValueError( f"Could not find errors for estimator {feature_set_id}" ) summary_lines.append("Estimator Summary") summary_lines.append("-" * 20) feature_set = config.feature_set_map.get(feature_set_id) if feature_set is None: raise ValueError( f"Could not find a feature set {feature_set_id} in model config" ) for feature in feature_set.features: summary_lines.append(f"Feature: {feature.name} ({feature.units})") summary_lines.append( f"Distance: {config.distance.name} ({config.distance.units})" ) for target in config.target.targets: summary_lines.append(f"Target: {target.name} ({target.units})") target_errors = estimator_errors.error_by_target.get(target.name) if target_errors is None: raise ValueError(f"Could not find errors for target {target.name}") summary_lines.append( f"Raw Predicted Consumption: {target_errors.pred_dist_per_energy:.3f} " f"({config.distance.units}/{target.units})" ) summary_lines.append( f"Real World Predicted Consumption: {target_errors.real_world_pred_dist_per_energy:.3f} " f"({config.distance.units}/{target.units})" ) summary_lines.append( f"Predict Method: {config.predict_method.value.upper()}" ) summary_lines.append("=" * 40) return "\n".join(summary_lines) def _repr_html_(self) -> str: """ Returns an html table of the model summary for display in a notebook """ config = self.metadata.config # Start the HTML table html_lines = ['<table border="1" style="border-collapse: collapse;">'] # Title: Model Summary html_lines.append( '<tr><th colspan="2" style="border-bottom: 2px solid black; text-align: center;">Model Summary</th></tr>' ) html_lines.append( f"<tr><td>Vehicle description</td><td>{config.vehicle_description}</td></tr>" ) html_lines.append( f"<tr><td>Powertrain type</td><td>{config.powertrain_type.name}</td></tr>" ) html_lines.append( f"<tr><td>Number of estimators</td><td>{len(self.estimators)}</td></tr>" ) for feature_set_id in self.estimators.keys(): estimator_errors = self.errors.estimator_errors.get(feature_set_id) if estimator_errors is None: raise ValueError( f"Could not find errors for estimator {feature_set_id}" ) # Title: Estimator Summary html_lines.append( '<tr><th colspan="2" style="border-bottom: 2px solid black; text-align: center;">Estimator Summary</th></tr>' ) feature_set = config.feature_set_map.get(feature_set_id) if feature_set is None: raise ValueError( f"Could not find a feature set {feature_set_id} in model config" ) for feature in feature_set.features: html_lines.append( f"<tr><td>Feature</td><td>{feature.name} ({feature.units})</td></tr>" ) html_lines.append( "<tr><td>Distance</td>" f"<td>{config.distance.name} ({config.distance.units})</td></tr>" ) for target in config.target.targets: html_lines.append( f"<tr><td>Target</td><td>{target.name} ({target.units})</td></tr>" ) target_errors = estimator_errors.error_by_target.get(target.name) if target_errors is None: raise ValueError(f"Could not find errors for target {target.name}") html_lines.append( "<tr><td>Predicted Consumption</td>" f"<td>{target_errors.pred_dist_per_energy:.3f} " f"({config.distance.units}/{target.units})</td></tr>" ) html_lines.append( "<tr><td>Real World Predicted Consumption</td>" f"<td>{target_errors.real_world_pred_dist_per_energy:.3f} " f"({config.distance.units}/{target.units})</td></tr>" ) html_lines.append( f"<tr><td>Predict Method</td>" f"<td>{config.predict_method.value.upper()}</td></tr>" ) # End the HTML table html_lines.append("</table>") return "".join(html_lines)