Source code for NRWAL.handlers.groups

# -*- coding: utf-8 -*-
"""
Handler objects to interface with NRWAL equation groups (files).
"""
from abc import ABC
import copy
import re
import os
import json
import yaml
import numpy as np
import logging
import operator
from collections import OrderedDict

from NRWAL.handlers.equations import Equation
from NRWAL.utilities.utilities import find_parens

logger = logging.getLogger(__name__)


[docs] class AbstractGroup(ABC): """Abstract class for groupings of equations or variables in a single yaml or json file. """ def __init__(self, group, name=None, interp_extrap_power=False, use_nearest_power=False, interp_extrap_year=False, use_nearest_year=False): """ Parameters ---------- group : str | dict String filepath to a yaml or json file containing one or more equation strings OR a pre-extracted dictionary from a yaml or json file with equation strings as values. name : str | None Optional name for identification and debugging if this AbstractGroup is being initialized with the "group" input argument as a pre-extracted dictionary. interp_extrap_power : bool Flag to interpolate and extrapolate power (MW) dependent equations based on the case-insensitive regex pattern: "_[0-9]*MW$" This takes preference over the use_nearest_power flag. If both interp_extrap_power & use_nearest_power are False, a KeyError will be raised if the exact equation name request is not found. use_nearest_power : bool Flag to use the nearest valid power (MW) dependent equation based on the case-insensitive regex pattern: "_[0-9]*MW$" This is second priority to the interp_extrap_power flag. If both interp_extrap_power & use_nearest_power are False, a KeyError will be raised if the exact equation name request is not found. interp_extrap_year : bool Flag to interpolate and extrapolate equations keyed by year. This takes preference over the use_nearest_year flag. If both interp_extrap_year & use_nearest_year are False, a KeyError will be raised if the exact equation name request is not found. use_nearest_year : bool Flag to use the nearest valid equation keyed by year. This is second priority to the interp_extrap_year flag. If both interp_extrap_year & use_nearest_year are False, a KeyError will be raised if the exact equation name request is not found. """ self._base_name = name self._dir_name = None if isinstance(group, str): self._base_name = os.path.basename(group) self._dir_name = os.path.dirname(group) self._default_variables = {} self._interp_extrap_power = interp_extrap_power self._use_nearest_power = use_nearest_power self._interp_extrap_year = interp_extrap_year self._use_nearest_year = use_nearest_year self._group = self._parse_group(group)
[docs] def __add__(self, other): """Add another equation group to this instance of EquationGroup (self) and return a new EquationGroup object that updates this instance with the new input. Note that overlapping sub EquationGroups in the original EquationGroup may be overwritten by the new input if a duplicate key exists. Parameters ---------- other : EquationGroup | str | dict Another EquationGroup object or filepath to an EquationGroup to add to this instance of EquationGroup (self). Returns ------- out : EquationGroup A new EquationGroup instance with this instance of EquationGroup (self) updated with the input EquationGroup. Note that overlapping sub EquationGroups in the original EquationGroup may be overwritten by the new input if a duplicate key exists. """ cls = self.__class__ if isinstance(other, (str, dict)): other = cls(other, interp_extrap_power=self._interp_extrap_power, use_nearest_power=self._use_nearest_power, interp_extrap_year=self._interp_extrap_year, use_nearest_year=self._use_nearest_year) out = copy.deepcopy(self) out._group.update(other._group) out.set_default_variables(other._default_variables) return out
def __repr__(self): return str(self) @staticmethod def _getitem_math(obj, key, workspace): """Helper function to recusively perform math for __getitem__ method Parameters ---------- obj : EquationGroup | EquationDirectory Instance of EquationGroup or EquationDirectory. This is input explicitly in a staticmethod instead of an instance method so that EquationDirectory can share the method. key : str A key or set of keys (delimited by "::") to retrieve from this EquationGroup instance. For example, if this EquationGroup has an equation 'eqn1': 'm*x + b', the the input key could be: 'eqn1' to retrieve the Equation object that holds 'm*x + b'. The input argument key can also be delimited like 'set_1::eqn1' to retrieve eqn1 nested in a sub EquationGroup object "set_1". The input argument can also have embedded math like 'set_1::eqn1 + set_2::eqn2 ** 2'. workspace : None | dict Temporary workspace to hold parts of math expressions. Useful for extracting and caching parenthetical statements. Returns ------- out : Equation | EquationGroup An object in this instance of EquationGroup keyed by the input argument key. """ # order of operator map enforces order of operations op_map = OrderedDict() op_map['+'] = operator.add op_map['-'] = operator.sub op_map['*'] = operator.mul op_map['/'] = operator.truediv op_map['^'] = operator.pow key = key.replace('**', '^') if any(c in key for c in ('[', ']', '{', '}')): msg = ('Cannot parse EquationGroup key with square or curly ' 'brackets: {}'.format(key)) logger.error(msg) raise ValueError(msg) while '(' in key: start_loc, end_loc = find_parens(key)[0] wkey = 'workspace_{}'.format(1 + len(workspace)) assert wkey not in workspace pk = key[start_loc:end_loc] key = key.replace(pk, wkey) pk = pk.lstrip('(').rstrip(')') workspace[wkey] = obj._getitem(pk, workspace) if key in workspace: return workspace[key] for op_str, op_fun in op_map.items(): if op_str in key: split_keys = key.partition(op_str) k1 = split_keys[0].strip() k2 = split_keys[2].strip() out1 = workspace.get(k1, None) if out1 is None: out1 = obj._getitem(k1, workspace) out2 = workspace.get(k2, None) if out2 is None: out2 = obj._getitem(k2, workspace) return op_fun(out1, out2) def _getitem(self, key, workspace): """Protected method for __getitem__ with additional args for recursive call. Parameters ---------- key : str A key or set of keys (delimited by "::") to retrieve from this EquationGroup instance. For example, if this EquationGroup has an equation 'eqn1': 'm*x + b', the the input key could be: 'eqn1' to retrieve the Equation object that holds 'm*x + b'. The input argument key can also be delimited like 'set_1::eqn1' to eetrieve eqn1 nested in a sub EquationGroup object "set_1". The input argument can also have embedded math like 'set_1::eqn1 + set_2::eqn2 ** 2'. workspace : dict | None Temporary workspace to hold parts of math expressions. Useful for extracting and caching parenthetical statements. Returns ------- out : Equation | EquationGroup An object in this instance of EquationGroup keyed by the input argument key. """ if workspace is None: workspace = {} operators = ('+', '-', '*', '/', '^') if any(op in key for op in operators): return self._getitem_math(self, key, workspace) if key not in self and Equation.is_num(key): return Equation(key) if '::' in str(key): keys = key.split('::') else: keys = [key] keys = [str(k) for k in keys] out = self._group for eqn_key in keys: nn_eqns, nn_values, eqn_value = \ self._get_nn_eqns_values(eqn_key, keys, out) if eqn_key in out: out = out[eqn_key] elif (self._interp_extrap_power or self._interp_extrap_year and len(nn_eqns) > 1): x1, x3 = nn_values[0:2] y1, y3 = nn_eqns[0:2] out = (y3 - y1) * (eqn_value - x1) / (x3 - x1) + y1 if not any(out.variables): out = Equation(out.eval()) elif any(nn_eqns): out = nn_eqns[0] else: msg = ('Could not retrieve equation key "{}", ' 'could not find "{}" in last available keys: {}' .format(key, eqn_key, list(out.keys()))) logger.error(msg) raise KeyError(msg) return out def __getitem__(self, key): """Retrieve a nested Equation or EquationGroup object from this instance of an EquationGroup. Parameters ---------- key : str A key or set of keys (delimited by "::") to retrieve from this EquationGroup instance. For example, if this EquationGroup has an equation 'eqn1': 'm*x + b', the the input key could be: 'eqn1' to retrieve the Equation object that holds 'm*x + b'. The input argument key can also be delimited like 'set_1::eqn1' to eetrieve eqn1 nested in a sub EquationGroup object "set_1". The input argument can also have embedded math like 'set_1::eqn1 + set_2::eqn2 ** 2'. Returns ------- out : Equation | EquationGroup An object in this instance of EquationGroup keyed by the input argument key. """ return self._getitem(key, None) def __contains__(self, arg): return arg in self.keys() def _get_nn_eqns_values(self, eqn_key, keys, group): """Get lists of the nearest power or year dependent equations. Parameters ---------- eqn_key Current equation retrieval key from the keys list keys : list List of equation strings delimited by '::'. For example, if retrieving "2015::eqn_group::eqn_2012", keys will be: ['2015', 'eqn_group', 'eqn_2012'] group : EquationGroup Current group to retrieve equations from. This is typically the group level just before the eqn_key Returns ------- nn_eqns : list List of Equation objects close to eqn_key. Empty list if eqn_key is not the last entry in keys. nn_values : list List of power or year values sorted by distance to eqn_key and corresponding to nn_eqns. Empty list if eqn_key is not the last entry in keys. eqn_value : None | int | float Power in MW (float) or year in YYYY format (int) from eqn_key. None if eqn_key is not the last entry in keys. """ nn_eqns = [] nn_values = [] eqn_value = None i = keys.index(eqn_key) if i == (len(keys) - 1): # Only look for adjacent equations when were at the last # retrieval level in the EquationGroup if ((self._interp_extrap_power or self._use_nearest_power) and self.is_power_eqn(eqn_key)): nn_eqns, nn_values = \ self.find_nearest_power_eqns(eqn_key, group=group) eqn_value = self._parse_power(eqn_key)[0] elif ((self._interp_extrap_year or self._use_nearest_year) and self.is_year_eqn(eqn_key)): nn_eqns, nn_values = \ self.find_nearest_year_eqns(eqn_key, group=group) eqn_value = self._parse_year(eqn_key)[0] return nn_eqns, nn_values, eqn_value
[docs] @classmethod def is_power_eqn(cls, key): """Determine if an equation key is power-based by looking for the case-insensitive regex pattern "_[0-9]*MW$" Parameters ---------- key : str An equation key/name. Returns ------- out : bool True if the regex pattern "_[0-9]*MW$" was found in key """ out = False if cls._parse_power(key)[0] is not None: out = True return out
@staticmethod def _parse_power(key): """Parse the integer power from an equation key Parameters ---------- key : str A key to retrieve an equation from this EquationGroup. Should contain the case-insensitive regex pattern "_[0-9]*MW$". Otherwise, None will be returned. Returns ------- power : float | None The numeric power value in key in the regex pattern "_[0-9]*MW$". If the pattern is not found, None is returned base_str : str Key with the regex pattern stripped out. """ base_str = key power = re.search('_[0-9]*MW$', key, flags=re.IGNORECASE) if power is not None: base_str = key.replace(power.group(0), '') power = float(power.group(0).upper().replace('MW', '').lstrip('_')) return power, base_str
[docs] def find_nearest_power_eqns(self, request, group=None): """Find power-based (MW) equations in this EquationGroup that match the request (by regex pattern "_[0-9]*MW$") and sort them by difference in equation power. For example, if the request is "eqn_a_7MW" and there are "eqn_a_4MW", "eqn_a_6MW", and "eqn_a_10MW" in this group, this method will return [eqn_a_6MW, eqn_a_10MW, eqn_a_4MW], [6, 10, 4] Parameters ---------- request : str A key to retrieve an equation from this EquationGroup. Should contain the case-insensitive regex pattern "_[0-9]*MW$". Otherwise, empty lists will be returned. group : EquationGroup Group to be looking in for equations adjacent to the requested equation. Defaults to the top level self._group attribute. Returns ------- eqns : list List of Equation objects that match the request key and are sorted by difference in the _*MW specification to the input request key. If the request key does not have the _*MW specification or if no other keys in this EquationGroup match the request then this will return an empty list. eqn_powers : list List of float power MW values corresponding to eqns and sorted by difference in the _*MW specification to the input request key. If the request key does not have the _*MW specification or if no other keys in this EquationGroup match the request then this will return an empty list. """ eqn_keys = [] eqn_powers = [] if group is None: group = self._group req_mw, base_str = self._parse_power(request) if req_mw: for key in group.keys(): match_mw, match_base = self._parse_power(key) if match_mw and base_str == match_base: eqn_keys.append(key) eqn_powers.append(match_mw) if any(eqn_keys): eqn_pow_diffs = np.abs(req_mw - np.array(eqn_powers)) indices = np.argsort(eqn_pow_diffs) eqn_keys = list(np.array(eqn_keys)[indices]) eqn_powers = list(np.array(eqn_powers)[indices]) eqns = [group[k] for k in eqn_keys] return eqns, eqn_powers
[docs] @classmethod def is_year_eqn(cls, key): """Determine if an equation key is year-based by looking for *_YYYY in the key Parameters ---------- key : str An equation key/name. Returns ------- out : bool True if a year string *_YYYY is found in key """ out = False if cls._parse_year(key)[0] is not None: out = True return out
@staticmethod def _parse_year(key): """Parse the integer year from an equation key Parameters ---------- key : str A key to retrieve an equation from this EquationGroup. Should have the *_YYYY pattern. Otherwise, None will be returned. Returns ------- year : int | None The numeric year value in key. If the pattern is not found, None is returned base_str : str Key with the regex pattern stripped out. """ base_str = key year = re.search('_[1-2][0-9]{3}$', key, flags=re.IGNORECASE) if year is not None: base_str = key.replace(year.group(0), '') year = int(year.group(0).lstrip('_')) # unlikely to be a year before 1800 or after 2200 if year < 1800 or year > 2200: year = None base_str = key return year, base_str
[docs] def find_nearest_year_eqns(self, request, group=None): """Find year-based (*_YYYY) equations in this EquationGroup that match the request difference in equation year. Parameters ---------- request : str A key to retrieve an equation from this EquationGroup. Should have the *_YYYY pattern. Otherwise, None will be returned. group : EquationGroup Group to be looking in for equations adjacent to the requested equation. Defaults to the top level self._group attribute. Returns ------- eqns : list List of Equation objects that match the request key and are sorted by difference in the YYYY specification to the input request key. If the request key does not have the YYYY specification or if no other keys in this EquationGroup match the request then this will return an empty list. eqn_years : list List of integer year YYYY values corresponding to eqns and sorted by difference in the YYYY specification to the input request key. If the request key does not have the YYYY specification or if no other keys in this EquationGroup match the request then this will return an empty list. """ eqn_keys = [] eqn_years = [] if group is None: group = self._group req_yr, base_str = self._parse_year(request) if req_yr: for key in group.keys(): match_yr, match_base = self._parse_year(key) if match_yr and base_str == match_base: eqn_keys.append(key) eqn_years.append(match_yr) if any(eqn_keys): eqn_pow_diffs = np.abs(req_yr - np.array(eqn_years)) indices = np.argsort(eqn_pow_diffs) eqn_keys = list(np.array(eqn_keys)[indices]) eqn_years = list(np.array(eqn_years)[indices]) eqns = [group[k] for k in eqn_keys] return eqns, eqn_years
@staticmethod def _parse_group(group): """ Parameters ---------- group : str | dict String filepath to a yaml or json file containing one or more equation strings OR a pre-extracted dictionary from a yaml or json file with equation strings as values. Returns ------- group : dict Loaded dictionary from a yaml or json file with equation strings or nested equation group dictionaries as values. """ if isinstance(group, str): if not os.path.exists(group): msg = 'Cannot find equation file path: {}'.format(group) logger.error(msg) raise FileNotFoundError(msg) if group.endswith('.json'): with open(group, 'r') as f: group = json.load(f) elif group.endswith(('.yml', '.yaml')): with open(group, 'r') as f: group = yaml.safe_load(f) else: msg = ('Cannot load file path, must be json or yaml: {}' .format(group)) logger.error(msg) raise ValueError(msg) if not isinstance(group, dict): msg = 'Cannot use group of type: {}'.format(type(group)) logger.error(msg) raise TypeError(msg) group = {str(k): v for k, v in group.items()} return group
[docs] def set_default_variables(self, var_dict): """Set default variables available to this object and all sub-groups and equations within this object. Parameters ---------- var_dict : dict | None Default variables namespace. Variables from this input will be passed to all Equation objects in this EquationGroup. These variables can always be overwritten when Equation.evaluate() is called. """ if var_dict is not None: self._default_variables.update(copy.deepcopy(var_dict)) for v in self.values(): v.set_default_variables(var_dict)
@classmethod def _r_all_equations(cls, obj): """Recusively retrieve all Equation objects from an EquationGroup or EquationDirectory object Parameters ---------- obj : EquationGroup | EquationDirectory Group or directory of equations to recusively search for base Equation objects. Returns ------- eqns : list List of all Equation objects extracted from the input object. """ eqns = [] for v in obj.values(): if isinstance(v, Equation): eqns.append(v) elif not isinstance(v, (int, float, str)): eqns += cls._r_all_equations(v) return eqns
[docs] def head(self, n=5): """Return the first n lines of the group string representation""" return '\n'.join(str(self).split('\n')[:n])
[docs] def tail(self, n=5): """Return the last n lines of the group string representation""" return '\n'.join(str(self).split('\n')[-1 * n:])
@property def all_equations(self): """List of all Equation objects from this object.""" return self._r_all_equations(self)
[docs] def get(self, key, default_value): """Attempt to get a key from the EquationGroup, return default_value if the key could not be retrieved""" try: return self[key] except KeyError: return default_value
[docs] def keys(self): """Get the 1st level of equation group keys, same as dict.keys()""" return self._group.keys()
[docs] def items(self): """Get the 1st level of equation (keys, values), same as dict.items(). """ return self._group.items()
[docs] def values(self): """Get the 1st level of equation values, same as dict.values()""" return self._group.values()
[docs] class EquationGroup(AbstractGroup): """Class to handle a single json or yaml file with multiple wind cost equations. """ def __str__(self): s = ['EquationGroup object with heirarchy:'] if self._base_name is not None: s = ['EquationGroup object from "{}" with heirarchy:' .format(self._base_name)] for k, v in self.items(): if isinstance(v, Equation): s.append(str(v)) else: s.append(str(k)) s += ['\t' + x for x in str(v).split('\n')[1:]] return '\n'.join(s) def _parse_group(self, group): """Parse a group of equation strings defined in a yaml or json file Parameters ---------- group : str | dict String filepath to a yaml or json file containing one or more equation strings OR a pre-extracted dictionary from a yaml or json file with equation strings as values. Returns ------- group : dict Loaded dictionary from a yaml or json file with equation strings or nested equation group dictionaries as values. """ group = super()._parse_group(group) for k, v in sorted(group.items()): if Equation.is_num(k): msg = ('You cannot use numbers as keys in group "{}"' .format(self._base_name)) logger.error(msg) raise ValueError(msg) if isinstance(v, (str, int, float)): group[k] = Equation(v, name=k) elif isinstance(v, dict): cls = self.__class__ group[k] = cls( v, name=k, interp_extrap_power=self._interp_extrap_power, use_nearest_power=self._use_nearest_power, interp_extrap_year=self._interp_extrap_year, use_nearest_year=self._use_nearest_year) else: msg = ('Cannot use equation group value that is not a ' 'string, float, int, or dictionary: {} ({})' .format(v, type(v))) logger.error(msg) raise TypeError(msg) # if input variables for an equation are found in the same group, just # insert the equations corresponding to those variables working = True while working: working = False for group_key, eqn in group.items(): if not isinstance(eqn, Equation): continue for var in [v for v in eqn.variables if v in group]: repl_str = '({})'.format(group[var].full) new_eqn = eqn.full.replace(var, repl_str) group[group_key] = eqn = eqn.replace_equation(new_eqn) working = True return group @property def default_variables(self): """Get a dictionary of default variables from a variables.yaml file accessible to this object Returns ------- dict """ return self._default_variables
[docs] class VariableGroup(AbstractGroup): """Class to handle a single json or yaml file with multiple numerical variable definitions from variables.yaml files. """ def __str__(self): s = ['VariableGroup object with variable definitions:'] for k, v in self.items(): s.append('{}: {}'.format(k, v)) return '\n'.join(s) @property def var_dict(self): """Get a dictionary of the variable namespace where keys are variable names and values are single numeric variable values. """ return self._group def _parse_group(self, group): """Parse a group of numerical variables defined in a yaml or json file Parameters ---------- group : str | dict String filepath to a yaml or json file containing one or more numerical variable definitions OR a pre-extracted dictionary from a yaml or json file with variable definitions. Returns ------- group : dict Loaded dictionary from a yaml or json file with numerical variable definitions """ group = super()._parse_group(group) for k, v in group.items(): if Equation.is_num(k): msg = ('You cannot use numbers as keys in group "{}"' .format(self._base_name)) logger.error(msg) raise ValueError(msg) if isinstance(v, int): v = float(v) group[k] = v if not isinstance(v, float): msg = ('Cannot use variable group value that is not a ' 'float: {} ({})'.format(v, type(v))) logger.error(msg) raise TypeError(msg) return group