Source code for phygnn.model_interfaces.base_model

# -*- coding: utf-8 -*-
"""
Base Model Interface
"""
from abc import ABC
import random
import copy
import pprint
import logging
import numpy as np
import pandas as pd
import tensorflow as tf
from warnings import warn

from phygnn.utilities import TF2, VERSION_RECORD
from phygnn.utilities.pre_processing import PreProcess

logger = logging.getLogger(__name__)


[docs]class ModelBase(ABC): """ Base Model Interface """ def __init__(self, model, feature_names=None, label_names=None, norm_params=None, normalize=(True, False), one_hot_categories=None): """ Parameters ---------- model : OBJ Initialized model object feature_names : list Ordered list of feature names. label_names : list Ordered list of label (output) names. norm_params : dict, optional Dictionary mapping feature and label names (keys) to normalization parameters (mean, stdev), by default None normalize : bool | tuple, optional Boolean flag(s) as to whether features and labels should be normalized. Possible values: - True means normalize both - False means don't normalize either - Tuple of flags (normalize_feature, normalize_label) by default True one_hot_categories : dict, optional Features to one-hot encode using given categories, if None do not run one-hot encoding, by default None """ self._model = model if isinstance(feature_names, str): feature_names = [feature_names] elif isinstance(feature_names, (np.ndarray, pd.Index)): feature_names = feature_names.tolist() self._feature_names = feature_names if isinstance(label_names, str): label_names = [label_names] elif isinstance(label_names, (np.ndarray, pd.Index)): label_names = label_names.tolist() self._label_names = label_names if norm_params is None: norm_params = {} self._norm_params = norm_params self._normalize = self._parse_normalize(normalize) if one_hot_categories is not None: PreProcess.check_one_hot_categories(one_hot_categories) self._one_hot_categories = one_hot_categories self._version_record = VERSION_RECORD logger.info('Active python environment versions: \n{}' .format(pprint.pformat(self._version_record, indent=4))) def __repr__(self): msg = "{}:\n{}".format(self.__class__.__name__, self.model_summary) return msg def __getitem__(self, features): """ Use model to predict label from given features Parameters ---------- features : pandas.DataFrame features to predict from Returns ------- pandas.DataFrame label prediction """ return self.predict(features) @property def version_record(self): """A record of important versions that this model was built with. Returns ------- dict """ return self._version_record @property def model_summary(self): """ Tensorflow model summary Returns ------- str """ try: summary = self._model.summary() except ValueError: summary = None return summary @property def normalize_features(self): """ Flag to normalize features Returns ------- bool """ return self._normalize[0] @property def feature_names(self): """ List of the feature variable names. Returns ------- list """ return self._feature_names @property def feature_dims(self): """ Number of features Returns ------- int """ n_features = (len(self.feature_names) if self.feature_names is not None else None) return n_features @property def normalize_labels(self): """ Flag to normalize labels Returns ------- bool """ return self._normalize[1] @property def label_names(self): """ label variable names Returns ------- list """ return self._label_names @property def label_dims(self): """ Number of labels Returns ------- int """ n_labels = (len(self.label_names) if self.label_names is not None else None) return n_labels @property def normalization_parameters(self): """ Features and label (un)normalization parameters Returns ------- dict """ return self._norm_params @property def means(self): """ Mapping feature/label names to the mean values for (un)normalization Returns ------- dict """ means = {k: v['mean'] for k, v in self._norm_params.items()} return means @property def stdevs(self): """ Mapping feature/label names to the stdev values for (un)normalization Returns ------- dict """ stdevs = {k: v['stdev'] for k, v in self._norm_params.items()} return stdevs @property def model(self): """ Trained model Returns ------- tensorflow.keras.models """ return self._model @property def feature_means(self): """ Feature means, used for (un)normalization Returns ------- list """ means = None if self._feature_names is not None: means = [] for f in self._feature_names: means.append(self.get_mean(f)) return means @property def feature_stdevs(self): """ Feature stdevs, used for (un)normalization Returns ------- list """ stdevs = None if self._feature_names is not None: stdevs = [] for f in self._feature_names: stdevs.append(self.get_stdev(f)) return stdevs @property def label_means(self): """ label means, used for (un)normalization Returns ------- list """ means = None if self.label_names is not None: means = [] for l_n in self.label_names: means.append(self.get_mean(l_n)) return means @property def label_stdevs(self): """ label stdevs, used for (un)normalization Returns ------- list """ stdevs = None if self.label_names is not None: stdevs = [] for l_n in self.label_names: stdevs.append(self.get_stdev(l_n)) return stdevs @property def input_feature_names(self): """ Input feature names Return ------ list """ if self._one_hot_categories is None: input_feature_names = self.feature_names else: input_feature_names = list(set(self.feature_names) - set(self.one_hot_feature_names) | set(self.one_hot_input_feature_names)) return input_feature_names @property def one_hot_input_feature_names(self): """ Input feature names to be one-hot encoded Return ------ list """ return list(self.one_hot_categories.keys()) @property def one_hot_feature_names(self): """ One-hot encoded feature names Return ------ list """ return [i for sub in self.one_hot_categories.values() for i in sub] @property def one_hot_categories(self): """ categories to use for one-hot encoding Returns ------- dict """ if self._one_hot_categories is None: return {} else: return self._one_hot_categories @staticmethod def _parse_normalize(normalize): """ Parse normalize flag(s) Parameters ---------- normalize : bool | tuple Boolean flag(s) as to whether features and labels should be normalized. Possible values: - True means normalize both - False means don't normalize either - Tuple of flags (normalize_feature, normalize_label) Returns ------- normalize : tuple Boolean flags (normalize_feature, normalize_label) """ if isinstance(normalize, bool): normalize = (normalize, normalize) elif isinstance(normalize, (tuple, list)): if len(normalize) != 2: msg = ('Expecting only 2 values: ' '(normalize_feature, normalize_label), but {} values ' 'were provided!: {}'.format(len(normalize), normalize)) logger.error(msg) raise ValueError(msg) else: msg = ('"normalize" must be a boolean flag or a tuple/list: ' '(normalize_feature, normalize_label), but {} was ' 'provided!: {}' .format(type(normalize), normalize)) logger.error(msg) raise TypeError(msg) return tuple(normalize)
[docs] @staticmethod def dict_json_convert(inp): """Recursively convert numeric values in dict to work with json dump Parameters ---------- inp : dict Dictionary to convert. Returns ------- out : dict Copy of dict input with all nested numeric values converted to base python int or float and all arrays converted to lists. """ if isinstance(inp, dict): out = {k: ModelBase.dict_json_convert(v) for k, v in inp.items()} elif isinstance(inp, (list, tuple)): out = [ModelBase.dict_json_convert(i) for i in inp] elif np.issubdtype(type(inp), np.floating): out = float(inp) elif np.issubdtype(type(inp), np.integer): out = int(inp) elif isinstance(inp, np.ndarray): out = inp.tolist() else: out = inp return out
[docs] @staticmethod def seed(s=0): """ Set the random seed for reproducible results. Parameters ---------- s : int Random number generator seed """ random.seed(s) np.random.seed(s) if TF2: tf.random.set_seed(s) else: tf.random.set_random_seed(s)
@staticmethod def _parse_data_names(data, names=None, fallback_prefix=''): """ Parse data array and names from input data Parameters ---------- data : pandas.DataFrame | dict | ndarray Features/labels to parse names : list, optional List of data item names, by default None fallback_prefix : str If data is provided as a numpy array without associated labels, the fallback_prefix will be used to name the enumerated feature dimension Returns ------- data : ndarray Data array names: list List of data item names """ if isinstance(data, pd.DataFrame): names = data.columns.tolist() data = data.values elif isinstance(data, dict): names = list(data.keys()) data = np.dstack(list(data.values()))[0] elif isinstance(data, np.ndarray) and names is None: names = [fallback_prefix + str(i) for i in range(data.shape[-1])] return data, names @staticmethod def _get_item_number(arr): """ Get number of items in array (labels or features) Parameters ---------- arr : ndarray 1 or 2D array Returns ------- n : int Number of items """ if len(arr.shape) == 1: n = 1 else: n = arr.shape[-1] return n
[docs] @staticmethod def make_one_hot_feature_names(feature_names, one_hot_categories): """ Update feature_names after one-hot encoding Parameters ---------- feature_names : list Input feature names one_hot_categories : dict Features to one-hot encode using given categories Returns ------- one_hot_feature_names : list Updated list of feature names with one_hot categories """ one_hot_feature_names = feature_names.copy() for name, categories in one_hot_categories.items(): if name in one_hot_feature_names: one_hot_feature_names.remove(name) for c in categories: if c not in one_hot_feature_names: one_hot_feature_names.append(c) return one_hot_feature_names
[docs] def get_norm_params(self, names): """ Get means and stdevs for given feature/label names Parameters ---------- names : list list of feature/label names to get normalization params for Returns ------- means : list List of means to use for (un)normalization stdevs : list List of stdevs to use for (un)normalization """ means = [] stdevs = [] for name in names: means.append(self.get_mean(name)) stdevs.append(self.get_stdev(name)) if None in means: means = None if None in stdevs: stdevs = None return means, stdevs
[docs] def get_mean(self, name): """ Get feature | label mean Parameters ---------- name : str feature | label name Returns ------- mean : float Mean value used for normalization """ mean = self._norm_params.get(name, None) if mean is not None: mean = mean.get('mean', None) return mean
[docs] def get_stdev(self, name): """ Get feature | label stdev Parameters ---------- name : str feature | label name Returns ------- stdev : float Stdev value used for normalization """ stdev = self._norm_params.get(name, None) if stdev is not None: stdev = stdev.get('stdev', None) return stdev
def _normalize_dict(self, items): """ Normalize given dictionary of items (features | labels) Parameters ---------- items : dict mapping of names to vectors Returns ------- norm_items : dict mapping of names to normalized-feature vectors """ norm_items = {} for key, value in items.items(): if key not in self.one_hot_feature_names: mean = self.get_mean(key) stdev = self.get_stdev(key) update = mean is None or stdev is None try: value, mean, stdev = PreProcess.normalize(value, mean=mean, stdev=stdev) if update: norm_params = {key: {'mean': mean, 'stdev': stdev}} self._norm_params.update(norm_params) except Exception as ex: msg = "Could not normalize {}:\n{}".format(key, ex) logger.warning(msg) warn(msg) norm_items[key] = value return norm_items def _normalize_arr(self, arr, names): """ Normalize array and save normalization parameters to given names Parameters ---------- arr : ndarray Array of features/label to normalize names : list List of feature/label names Returns ------- norm_arr : ndarray Normalized features/label """ n_names = self._get_item_number(arr) if len(names) != n_names: msg = ("Number of item names ({}) does not match number of items " "({})".format(len(names), arr.shape[1])) logger.error(msg) raise RuntimeError(msg) means, stdevs = self.get_norm_params(names) update = means is None or stdevs is None norm_arr, means, stdevs = PreProcess.normalize(arr, mean=means, stdev=stdevs) if update: for i, n in enumerate(names): norm_params = {n: {'mean': means[i], 'stdev': stdevs[i]}} self._norm_params.update(norm_params) return norm_arr
[docs] def normalize(self, data, names=None): """ Normalize given data Parameters ---------- data : dict | pandas.DataFrame | ndarray Data to normalize names : list, optional List of data item names, needed to normalized ndarrays, by default None Returns ------- data : dict | pandas.DataFrame | ndarray Normalized data in same format as input """ data = copy.deepcopy(data) if isinstance(data, dict): data = self._normalize_dict(data) elif isinstance(data, pd.DataFrame): if self.one_hot_feature_names: cols = [c for c in data if c not in self.one_hot_feature_names] data.loc[:, cols] = self._normalize_arr( data.loc[:, cols].values, cols) else: data.loc[:] = self._normalize_arr(data.values, data.columns) elif isinstance(data, (list, np.ndarray)): if names is None: msg = ('Names of items must be supplied to nomralize data ' 'arrays') logger.error(msg) raise RuntimeError(msg) else: if self.one_hot_feature_names: idx = [i for i, f in enumerate(names) if f not in self.one_hot_feature_names] norm_names = np.array(names)[idx] data[:, idx] = self._normalize_arr(data[:, idx], norm_names) else: data = self._normalize_arr(data, names) else: msg = "Cannot normalize data of type: {}".format(type(data)) logger.error(msg) raise RuntimeError(msg) return data
def _unnormalize_dict(self, items): """ Un-normalize given dictionary of items (features | labels) Parameters ---------- items : dict mapping of names to vectors Returns ------- native_items : dict mapping of names to native vectors """ native_items = {} for key, value in items.items(): norm_params = self.normalization_parameters[key] if norm_params is not None: value = PreProcess.unnormalize(value, norm_params['mean'], norm_params['stdev']) else: msg = ("Normalization Parameters unavailable, {} will not be " "un-normalized!".format(key)) logger.warning(msg) warn(msg) native_items[key] = value return native_items def _unnormalize_df(self, df): """ Un-normalize DataFrame Parameters ---------- df : pandas.DataFrame DataFrame of features/label to un-normalize Returns ------- df : pandas.DataFrame Native features/label df if norm params are not None """ means, stdevs = self.get_norm_params(df.columns) if means is not None and stdevs is not None: df = PreProcess.unnormalize(df.copy(), means, stdevs) else: msg = ("Normalization parameters are unavailable, df will not be " "un-normalized!") logger.warning(msg) warn(msg) return df def _unnormalize_arr(self, arr, names): """ Un-normalize array using given names Parameters ---------- arr : ndarray Array of features/label to un-normalize names : list List of feature/label names Returns ------- arr : ndarray Native features/label array if norm params are not None """ n_names = self._get_item_number(arr) if len(names) != n_names: msg = ("Number of item names ({}) does not match number of items " "({})".format(len(names), arr.shape[1])) logger.error(msg) raise RuntimeError(msg) means, stdevs = self.get_norm_params(names) if means is not None and stdevs is not None: arr = PreProcess.unnormalize(arr.copy(), means, stdevs) else: msg = ("Normalization parameters are unavailable, arr will not be " "un-normalized!") logger.warning(msg) warn(msg) return arr
[docs] def unnormalize(self, data, names=None): """ Un-normalize given data Parameters ---------- data : dict | pandas.DataFrame | ndarray Data to un-normalize names : list, optional List of data item names, needed to un-normalized ndarrays, by default None Returns ------- data : dict | pandas.DataFrame | ndarray Native data in same format as input """ if isinstance(data, dict): data = self._unnormalize_dict(data) elif isinstance(data, pd.DataFrame): data = self._unnormalize_df(data) elif isinstance(data, (list, np.ndarray)): if names is None: msg = ('Names of items must be supplied to un-nomralize data ' 'arrays') logger.error(msg) raise RuntimeError(msg) else: data = self._unnormalize_arr(data, names) else: msg = "Cannot un-normalize data of type: {}".format(type(data)) logger.error(msg) raise RuntimeError(msg) return data
def _check_one_hot_feature_names(self, feature_names): """ Check one_hot_feature_names, update feature_names to remove features that were one-hot encoded and add in new one-hot features if needed Parameters ---------- feature_names : list Input feature names """ one_hot_feature_names = self.make_one_hot_feature_names( feature_names, self.one_hot_categories) if one_hot_feature_names != self.feature_names: check_names = feature_names.copy() if self.label_names is not None: check_names += self.label_names PreProcess.check_one_hot_categories(self.one_hot_categories, feature_names=check_names) self._feature_names = one_hot_feature_names
[docs] def parse_features(self, features, names=None, **kwargs): """Parse features - preprocessing of feature data before training or prediction. This will do one-hot encoding based on self.one_hot_categories, and feature normalization based on self.normalize_features Parameters ---------- features : pandas.DataFrame | dict | ndarray Features to train on or predict from names : list, optional List of feature names, by default None kwargs : dict, optional kwargs for PreProcess.one_hot Returns ------- features : ndarray Parsed features array normalized and with str columns converted to one hot vectors if desired """ features, feature_names = self._parse_data_names(features, names=names, fallback_prefix='F') if self.feature_names is None: self._feature_names = feature_names check = (bool(self.one_hot_categories) and feature_names is not None and all(np.isin(feature_names, self.input_feature_names))) if check: self._check_one_hot_feature_names(feature_names) kwargs.update({'feature_names': feature_names, 'categories': self.one_hot_categories}) features = PreProcess.one_hot(features, **kwargs) elif (not isinstance(features, np.ndarray) and self.feature_names != feature_names): msg = ('Expecting features with names: {}, but was provided with: ' '{}!'.format(self.feature_names, feature_names)) logger.error(msg) raise RuntimeError(msg) if self.normalize_features: features = self.normalize(features, names=self.feature_names) if (self.feature_dims is not None and features.shape[-1] != self.feature_dims): msg = ('data has {} features but expected {}' .format(features.shape[1], self.feature_dims)) logger.error(msg) raise RuntimeError(msg) return features
[docs] def parse_labels(self, labels, names=None): """ Parse labels and normalize if desired Parameters ---------- labels : pandas.DataFrame | dict | ndarray Features to train on or predict from names : list, optional List of label names, by default None Returns ------- labels : ndarray Parsed labels array, normalized if desired """ labels, label_names = self._parse_data_names(labels, names=names, fallback_prefix='L') if self.label_names is not None: n_labels = self._get_item_number(labels) if n_labels != len(self.label_names): msg = ('data has {} labels but expected {}' .format(labels.shape[1], self.label_dims)) logger.error(msg) raise RuntimeError(msg) if self._label_names is None: self._label_names = label_names elif (not isinstance(labels, np.ndarray) and self.label_names != label_names): msg = ('Expecting labels with names: {}, but was provided with: ' '{}!'.format(self.label_names, label_names)) logger.error(msg) raise RuntimeError(msg) if self.normalize_labels: labels = self.normalize(labels, names=label_names) return labels
[docs] def predict(self, features, table=True, parse_kwargs=None, predict_kwargs=None): """ Use model to predict label from given features Parameters ---------- features : dict | pandas.DataFrame features to predict from table : bool, optional Return pandas DataFrame parse_kwargs : dict kwargs for cls.parse_features predict_kwargs : dict kwargs for tensorflow.*.predict Returns ------- prediction : ndarray | pandas.DataFrame label prediction """ if parse_kwargs is None: parse_kwargs = {} if isinstance(features, np.ndarray): n_features = features.shape[-1] if n_features == self.feature_dims: kwargs = {"names": self.feature_names} logger.debug('Parsing features with feature_names: {}' .format(self.feature_names)) elif n_features == len(self.input_feature_names): kwargs = {"names": self.input_feature_names} logger.debug('Parsing features with input_feature_names: {}' .format(self.input_feature_names)) else: msg = ('Number of features provided ({}) does not match number' ' of model features ({}) or number of input features ' '({})'.format(n_features, self.feature_dims, len(self.input_feature_names))) logger.error(msg) raise RuntimeError(msg) parse_kwargs.update(kwargs) features = self.parse_features(features, **parse_kwargs) if predict_kwargs is None: predict_kwargs = {} prediction = self._model.predict(features, **predict_kwargs) if self.normalize_labels: prediction = self.unnormalize(prediction, names=self.label_names) if table and len(prediction.shape) in (1, 2): prediction = pd.DataFrame(prediction, columns=self.label_names) return prediction