Source code for tyche.EpsilonConstraints

"""
Epsilon-constraint optimization.
"""

import numpy  as np
import pandas as pd
import time

from collections    import namedtuple
from scipy.optimize import fmin_slsqp, differential_evolution, shgo
from scipy.optimize import NonlinearConstraint

from mip import Model, MAXIMIZE, MINIMIZE, BINARY, xsum

from .Types import Optimum

[docs]class EpsilonConstraintOptimizer: """ An epsilon-constration multi-objective optimizer. Attributes ---------- evaluator : tyche.Evaluator The technology evaluator. scale : float The scaling factor for output. """ def __init__(self, evaluator, scale = 1e6): """ Parameters ---------- evaluator : tyche.Evaluator The technology evaluator. scale : float The scaling factor for output. """ self.evaluator = evaluator self.scale = scale self._max_metrics = {} # Create and store the set of valid optimization sense parameters for use # in optimization methods self.valid_sense = {'min', 'max'} def _f(self, evaluate, sense, verbose = 0): def f(x): xx = pd.Series(self.scale * x, name = "Amount", index = self.evaluator.max_amount.index) yy = evaluate(xx) if verbose > 2: print ('scaled decision variables: ', xx.values, '\n') print('metric statistics: ', yy.values, '\n') # fmin_slsqp always minimizes the objective function # if this method is maximizing, multiply the objective function by -1.0 to # maximize by minimizing the negative # if this method is minimizing, keep the objective function as is to # minimize if sense == 'max': _obj_mult = -1.0 else: _obj_mult = 1.0 return _obj_mult * yy return f def _fi(self, i, evaluate, sense, verbose = 0): return lambda x: self._f(evaluate, sense, verbose)(x)[i]
[docs] def opt_slsqp( self , metric , sense = None , max_amount = None , total_amount = None , eps_metric = None , statistic = np.mean, initial = None , tol = 1e-8 , maxiter = 50 , verbose = 0 , ): """ Optimize the objective function using the fmin_slsqp algorithm. Parameters ---------- metric : str Name of metric to maximize. sense : str Optimization sense ('min' or 'max'). If no value is provided to this method, the sense value used to create the EpsilonConstraintOptimizer object is used instead. max_amount : Series Maximum investment amounts by R&D category. The index (research area) order must follow that of self.evaluator.max_amount.index total_amount : float Upper limit on total investments summed across all R&D categories. eps_metric : Dict RHS of the epsilon constraint(s) on one or more metrics. Keys are metric names, and the values are dictionaries of the form {'limit': float, 'sense': str}. The sense defines whether the epsilon constraint is a lower or an upper bound, and the value must be either 'upper' or 'lower'. statistic : function Summary statistic used on the sample evaluations; the metric measure that is fed to the optimizer. initial : array of float Initial value of decision variable(s) fed to the optimizer. tol : float Search tolerance fed to the optimizer. maxiter : int Maximum number of iterations the optimizer is permitted to execute. verbose : int Verbosity level returned by the optimizer and this outer function. Defaults to 0. verbose = 0 No messages verbose = 1 Summary message when fmin_slsqp completes verbose = 2 Status of each algorithm iteration and summary message verbose = 3 Investment constraint status, metric constraint status, status of each algorithm iteration, and summary message verbose > 3 All metric values, decision variable values, investment constraint status, metric constraint status, status of each algorithm iteration, and summary message """ # If no optimization sense is provided, use the default value from self. # If an optimization sense IS provided, overwrite the default value with # the provided value. # _sense is the parameter used only in this method if sense is None: print('opt_slsqp: No optimization sense specified; maximizing objective function') sense = 'max' else: if sense not in self.valid_sense: raise ValueError(f'opt_slsqp: sense must be one of {self.valid_sense}') # create a function to evaluate the statistic evaluate = self.evaluator.make_statistic_evaluator(statistic) # get location index of metric i = np.where(self.evaluator.metrics == metric)[0][0] # if custom upper limits on investment amounts by category have not been # defined, get the upper limits from self.evaluator if max_amount is None: max_amount = self.evaluator.max_amount.Amount # scale the upper limits on investment amounts by category down such that # variables and constraints remain on approximately the same range bounds = [(0, x) for x in max_amount / self.scale] # define a function that will construct the investment constraint for the # optimizer, in the correct format def g(x): # create container for the constraints with a dummy value constraints = [1] # if the upper limit on total investments has been defined, if total_amount is not None: # scale the upper limit on investments (see line 107) limit = total_amount / self.scale # sum across all decision variable values (investment amounts) fed to # this function value = sum(x) if verbose == 3: print('Investment limit: ', np.round(limit, 3), ' Investment value: ', np.round(value, 3), ' Constraint met: ', value <= limit) elif verbose > 3: print('Decision variable values: ', np.round(x, 3), ' Investment limit: ', np.round(limit, 3), ' Investment value: ', np.round(value, 3), ' Constraint met: ', value <= limit) # update the constraint container with the LHS value of the # investment constraint as a >= 0 inequality constraint constraints = [limit - value] # exit the total_amount IF statement # if lower limits on metrics have been defined, if eps_metric is not None: # loop through all available metrics for index, info in eps_metric.items(): # get location index of the current metric j = np.where(self.evaluator.metrics == index)[0][0] if info['sense'] == 'lower': value = self._f(evaluate=evaluate, sense='min', verbose=verbose)(x)[j] elif info['sense'] == 'upper': value = self._f(evaluate=evaluate, sense='max', verbose=verbose)(x)[j] else: raise ValueError('opt_slsqp: Epsilon constraint must be upper or lower') if verbose == 3: print('Metric limit: ', np.round(info['limit'], 3), ' Metric value: ', np.round(value, 3), ' Constraint met: ', value >= info['limit']) elif verbose > 3: print('Decision variable values: ', np.round(x, 3), ' Metric limit: ', np.round(info['limit'], 3), ' Metric value: ', np.round(value, 3), ' Constraint met: ', value >= info['limit']) # append the existing constraints container with the LHS value of the # current metric constraint formulated as >= 0 # as the loop executes, one constraint per metric will be added to # the container constraints += [value - info['limit']] return constraints # if no initial decision variable values have been defined, if initial is None: # start the optimizer off at 10% of the upper limit on the decision # variable values initial = max_amount.values / 10 # note time when algorithm started start = time.time() # run the optimizer result = fmin_slsqp( self._fi(i, evaluate, sense, verbose), # callable function that returns the scalar objective function value initial / self.scale , # scaled initial guess values for decision variables (investment amounts, metrics) f_ieqcons = g , # list of functions that return inequality constraints in the form const >= 0 bounds = bounds , # upper and lower bounds on decision variables iter = maxiter, # number of times fmin_slsqp is permitted to iterate acc = tol , # requested accuracy of optimal solution iprint = verbose, # how much information fmin_slsqp returns full_output = True , # return final objective function value and summary information ) elapsed = time.time() - start # calculate the scaled decision variable values that optimize the # objective function x = pd.Series(self.scale * result[0], name = "Amount", index = self.evaluator.max_amount.index) # evaluate the chosen statistic for the scaled decision variable values y = evaluate(x) return Optimum( exit_code = result[3], exit_message = result[4], amounts = x , metrics = y , solve_time = elapsed , opt_sense = sense , )
[docs] def opt_diffev( self , metric , sense = None , max_amount = None , total_amount = None , eps_metric = None , statistic = np.mean , strategy = 'best1bin' , seed = 2 , tol = 0.01 , maxiter = 75 , init = 'latinhypercube', verbose = 0 , ): """ Maximize the objective function using the differential_evoluaion algorithm. Parameters ---------- metric : str Name of metric to maximize. The objective function. sense : str Optimization sense ('min' or 'max'). If no value is provided to this method, the sense value used to create the EpsilonConstraintOptimizer object is used instead. max_amount : Series Maximum investment amounts by R&D category. The index (research area) order must follow that of self.evaluator.max_amount.index total_amount : float Upper limit on total investments summed across all R&D categories. eps_metric : Dict RHS of the epsilon constraint(s) on one or more metrics. Keys are metric names, and the values are dictionaries of the form {'limit': float, 'sense': str}. The sense defines whether the epsilon constraint is a lower or an upper bound, and the value must be either 'upper' or 'lower'. statistic : function Summary statistic used on the sample evaluations; the metric measure that is fed to the optimizer. strategy : str Which differential evolution strategy to use. 'best1bin' is the default. See algorithm docs for full list. seed : int Sets the random seed for optimization by creating a new `RandomState` instance. Defaults to 1. Not setting this parameter means the solutions will not be reproducible. init : str or array-like Type of population initialization. Default is Latin hypercube; alternatives are 'random' or specifying every member of the initial population in an array of shape (popsize, len(variables)). tol : float Relative tolerance for convergence maxiter : int Upper limit on generations of evolution (analogous to algorithm iterations) verbose : int Verbosity level returned by this outer function and the differential_evolution algorithm. verbose = 0 No messages verbose = 1 Objective function value at every algorithm iteration verbose = 2 Investment constraint status, metric constraint status, and objective function value verbose = 3 Decision variable values, investment constraint status, metric constraint status, and objective function value verbose > 3 All metric values, decision variable values, investment constraint status, metric constraint status, and objective function value """ # If no optimization sense is provided, use the default value from self. # If an optimization sense IS provided, overwrite the default value with # the provided value. # _sense is the parameter used only in this method if sense is None: print('opt_diffev: No optimization sense provided; Maximizing objective function') sense = 'max' else: if sense not in self.valid_sense: raise ValueError(f'opt_diffev: sense must be one of {self.valid_sense}') # create a functio to evaluate the statistic evaluate = self.evaluator.make_statistic_evaluator(statistic) # get location index of metric i = np.where(self.evaluator.metrics == metric)[0][0] # if custom upper limits on investment amounts by category have not been # defined, get the upper limits from self.evaluator if max_amount is None: max_amount = self.evaluator.max_amount.Amount # scale the upper limits on investment amounts by category down such that # variables and constraints remain on approximately the same range var_bounds = [(0, x) for x in max_amount / self.scale] # define a function that will construct the investment constraint for the # optimizer, in the correct format def g(x): # create empty container for the constraints constraints = [] # if the upper limit on total investments has been defined, if total_amount is not None: # scale the upper limit on investments (see line 107) limit = total_amount / self.scale # sum across all decision variable values (investment amounts) fed to # this function value = sum(x) if verbose == 2: print('Investment limit: ', np.round(limit, 3), ' Investment value: ', np.round(value, 3), ' Constraint met: ', value <= limit) elif verbose > 2: print('Decision variable values: ', np.round(x, 3), ' Investment limit: ', np.round(limit, 3), ' Investment value: ', np.round(value, 3), ' Constraint met: ', value <= limit) # update the constraint container with the LHS value of the # investment constraint as a >= 0 inequality constraint constraints += [limit - value] # exit the total_amount IF statement # if lower limits on metrics have been defined, if eps_metric is not None: # loop through all available metrics for index, info in eps_metric.items(): # get location index of the current metric j = np.where(self.evaluator.metrics == index)[0][0] # calculate the summary statistic on the current metric if info['sense'] == 'lower': value = self._f(evaluate=evaluate, sense='min', verbose=verbose)(x)[j] elif info['sense'] == 'upper': value = self._f(evaluate=evaluate, sense='max', verbose=verbose)(x)[j] else: raise ValueError('opt_diffev: Epsilon constraint must be upper or lower') if verbose == 3: print('Metric limit: ', np.round(info['limit'], 3), ' Metric value: ', np.round(value, 3), ' Constraint met: ', value <= info['limit']) elif verbose > 3: print('Decision variable values: ', np.round(x, 3), ' Metric limit: ', np.round(info['limit'], 3), ' Metric value: ', np.round(value, 3), ' Constraint met: ', value <= info['limit']) # append the existing constraints container with the LHS value of the # current metric constraint formulated as >= 0 # as the loop executes, one constraint per metric will be added to # the container constraints += [value - info['limit']] return np.array(constraints) # note time when algorithm started start = time.time() # run the optimizer result = differential_evolution( self._fi(i, evaluate, sense, verbose), # callable function that returns the scalar objective function value bounds=var_bounds, # upper and lower bounds on decision variables strategy=strategy, # defines differential evolution strategy to use maxiter=maxiter, # default maximum iterations to execute tol=tol, # default tolerance on returned optimum seed=seed, # specify a random seed for reproducible optimizations disp=verbose>=1, # print objective function at every iteration init=init, # type of population initialization # @note ignore this warning for now constraints=NonlinearConstraint(g, 0.0, np.inf)) elapsed = time.time() - start if result.success: # calculate the scaled decision variable values that optimize the # objective function x = pd.Series(self.scale * result.x, name="Amount", index=self.evaluator.max_amount.index) # evaluate the chosen statistic for the scaled decision variable values y = self.evaluator.evaluate_statistic(x, statistic) return Optimum( exit_code=result.success, exit_message=result.message, amounts=x, metrics=y, solve_time=elapsed, opt_sense=sense ) else: return result, elapsed
[docs] def opt_shgo( self , metric , sense = None , max_amount = None , total_amount = None , eps_metric = None , statistic = np.mean , tol = 0.01 , maxiter = None , sampling_method = 'simplicial', verbose = 0 , ): """ Maximize the objective function using the shgo global optimization algorithm. Parameters ---------- metric : str Name of metric to maximize. sense : str Optimization sense ('min' or 'max'). If no value is provided to this method, the sense value used to create the EpsilonConstraintOptimizer object is used instead. max_amount : Series Maximum investment amounts by R&D category. The index (research area) order must follow that of self.evaluator.max_amount.index total_amount : float Upper metric_limit on total investments summed across all R&D categories. eps_metric : Dict RHS of the epsilon constraint(s) on one or more metrics. Keys are metric names, and the values are dictionaries of the form {'limit': float, 'sense': str}. The sense defines whether the epsilon constraint is a lower or an upper bound, and the value must be either 'upper' or 'lower'. statistic : function Summary metric_statistic used on the sample evaluations; the metric measure that is fed to the optimizer. tol : float Objective function tolerance in stopping criterion. maxiter : int Upper metric_limit on iterations that can be performed. Defaults to None. Specifying this parameter can cause shgo to stall out instead of solving. sampling_method : str Allowable values are 'sobol and 'simplicial'. Simplicial is default, uses less memory, and guarantees convergence (theoretically). Sobol is faster, uses more memory and does not guarantee convergence. Per documentation, Sobol is better for "easier" problems. verbose : int Verbosity level returned by this outer function and the SHGO algorithm. verbose = 0 No messages verbose = 1 Convergence messages from SHGO algorithm verbose = 2 Investment constraint status, metric constraint status, and convergence messages verbose = 3 Decision variable values, investment constraint status, metric constraint status, and convergence messages verbose > 3 All metric values, decision variable values, investment constraint status, metric constraint status, and convergence messages """ # If no optimization sense is provided, use the default value from self. # If an optimization sense IS provided, overwrite the default value with # the provided value. # _sense is the parameter used only in this method if sense is None: print('opt_shgo: No optimization sense provided; Maximizing objective function') sense = 'max' else: if sense not in self.valid_sense: raise ValueError(f'opt_shgo: sense must be one of {self.valid_sense}') # create a functio to evaluate the statistic evaluate = self.evaluator.make_statistic_evaluator(statistic) # get location metric_index of metric i = np.where(self.evaluator.metrics == metric)[0][0] # if custom upper limits on investment amounts by category have not been # defined, get the upper limits from self.evaluator if max_amount is None: max_amount = self.evaluator.max_amount.Amount # scale the upper limits on investment amounts by category down such that # variables and constraints remain on approximately the same range bounds = [(0, x) for x in max_amount / self.scale] # define a dictionary of functions that define individual constraints and # their types (all inequalities) # initialize constraints container constraints = [] # construct the investment constraint # if the upper metric_limit on total investments has been defined, if total_amount is not None: def g_inv(x): # scale the upper metric_limit on investments inv_limit = total_amount / self.scale # sum across all decision variable values (investment amounts) fed to # this function inv_value = sum(x) if verbose == 2: # print the investment limit (RHS of constraint), the investment # amount (LHS of constraint), and a Boolean indicating whether the # investment constraint is met print('Investment limit: ', np.round(inv_limit, 3), ' Investment value: ', np.round(inv_value, 3), ' Constraint met: ', inv_value <= inv_limit) # if verbose is greater than or equal to three elif verbose > 2: # also print the decision variable values print('Decision variable values: ', np.round(x, 3), ' Investment limit: ', np.round(inv_limit, 3), ' Investment value: ', np.round(inv_value, 3), ' Constraint met: ', inv_value <= inv_limit) return inv_limit - inv_value # update the constraint container with the LHS value of the # investment constraint as a <= 0 inequality constraint constraints += [{'type': 'ineq', 'fun': g_inv}] # exit the total_amount IF statement def make_g_metric(mkg_evaluate, mkg_sense, metric_index, metric_limit): j = np.where(self.evaluator.metrics == metric_index)[0][0] def g_metric_fn(x): if mkg_sense == 'upper': met_value = self._f(evaluate=mkg_evaluate, sense='max', verbose=verbose)(x)[j] elif mkg_sense == 'lower': met_value = self._f(evaluate=mkg_evaluate, sense='min', verbose=verbose)(x)[j] else: raise ValueError('opt_shgo: Epsilon constraint must be upper or lower') if verbose == 2: print('Metric limit: ', np.round(metric_limit, 3), ' Metric value: ', np.round(met_value, 3), ' Constraint met: ', met_value >= metric_limit) elif verbose > 2: print('Decision variable values: ', np.round(x, 3), ' Metric limit: ', np.round(metric_limit, 3), ' Metric value: ', np.round(met_value, 3), ' Constraint met: ', met_value >= metric_limit) return met_value - metric_limit return g_metric_fn # if lower limits on metrics have been defined, if eps_metric is not None: # loop through all available metrics for index, info in eps_metric.items(): # append the existing constraints container with the value of the # current metric constraint # as the loop executes, one constraint per metric will be added to the # container g_metric = make_g_metric( mkg_evaluate = evaluate, mkg_sense = info['sense'], metric_index = index, metric_limit = info['limit'] ) constraints += [{'type': 'ineq', 'fun': g_metric}] opt_dict = {'f_tol': tol, 'maxiter': maxiter, 'disp': verbose >= 1} # note time when algorithm started start = time.time() result = shgo( self._fi(i, evaluate, sense, verbose), # callable function that returns the scalar objective function value bounds=bounds, # upper and lower bounds on decision variables constraints=constraints, options=opt_dict, # dictionary of options including max iters sampling_method=sampling_method #sampling method (sobol or simplicial) ) elapsed = time.time() - start if result.success: # calculate the scaled decision variable values that optimize the # objective function x = pd.Series(self.scale * result.x, name="Amount", index=self.evaluator.max_amount.index) # evaluate the chosen metric_statistic for the scaled decision variable values y = evaluate(x) return Optimum( exit_code=result.success, exit_message=result.message, amounts=x, metrics=y, solve_time=elapsed, opt_sense=sense ) else: return result, elapsed
[docs] def optimum_metrics( self , max_amount = None , total_amount = None , sense = None , statistic = np.mean, tol = 1e-8 , maxiter = 50 , verbose = 0 , ): """ Maximum value of metrics. Parameters ---------- max_amount : DataFrame The maximum amounts that can be invested in each category. total_amount : float The maximum amount that can be invested *in toto*. sense : Dict or str Optimization sense for each metric. Must be 'min' or 'max'. If None, then the sense provided to the EpsilonConstraintOptimizer class is used for all metrics. If string, the sense is used for all metrics. statistic : function The statistic used on the sample evaluations. tol : float The search tolerance. maxiter : int The maximum iterations for the search. verbose : int Verbosity level. """ # if no metric_sense is provided, default to maximizing metrics if sense is None: print('optimum_metrics: No optimization sense provided; Maximizing metrics') self._max_metrics = { mtr : self.opt_slsqp( metric = mtr, sense = 'max', max_amount=max_amount, total_amount=total_amount, eps_metric=None, statistic=statistic, tol=tol, maxiter=maxiter, verbose=verbose ) for mtr in self.evaluator.metrics } else: # if metric_sense is a dictionary, use the sense value provided per metric if type(sense) == dict: self._max_metrics = { mtr: self.opt_slsqp( metric = mtr, sense = sense[mtr], max_amount = max_amount, total_amount = total_amount, eps_metric = None, statistic = statistic, tol = tol, maxiter = maxiter, verbose = verbose ) for mtr in self.evaluator.metrics } # if metric_sense is a string, apply that sense to all metrics elif type(sense) == str: self._max_metrics = { mtr: self.opt_slsqp( metric = mtr, sense = sense, max_amount = max_amount, total_amount = total_amount, eps_metric = None, statistic = statistic, tol = tol, maxiter = maxiter, verbose = verbose ) for mtr in self.evaluator.metrics } else: raise TypeError(f'optimum_metrics: sense must be dict or str') # Provide info on any failed metric optimizations for k,v in self._max_metrics.items(): if v.exit_code != 0: print(f'Metric {k} optimization failed: Code {v.exit_code}, {v.exit_message}') v.metrics[k] = -1.0 return pd.Series( [v.metrics[k] for k, v in self._max_metrics.items()], name = "Value", index = self._max_metrics.keys(), )
[docs] def opt_milp( self, metric, sense = None , max_amount = None , total_amount = None , eps_metric = None , statistic = np.mean, sizelimit = 1e6 , verbose = 0 , ): """ Maximize the objective function using a piecewise linear representation to create a mixed integer linear program. Parameters ---------- metric : str Name of metric to maximize sense : str Optimization sense ('min' or 'max'). If no value is provided to this method, the sense value used to create the EpsilonConstraintOptimizer object is used instead. max_amount : Series Maximum investment amounts by R&D category. The index (research area) order must follow that of self.evaluator.max_amount.index total_amount : float Upper limit on total investments summed across all R&D categories. eps_metric : Dict RHS of the epsilon constraint(s) on one or more metrics. Keys are metric names, and the values are dictionaries of the form {'limit': float, 'sense': str}. The sense defines whether the epsilon constraint is a lower or an upper bound, and the value must be either 'upper' or 'lower'. statistic : function Summary statistic (metric measure) fed to evaluator_corners_wide method in Evaluator total_amount : float Upper limit on total investments summed across all R&D categories sizelimit : int Maximum allowed number of binary variables. If the problem size exceeds this limit, pwlinear_milp will exit before building or solving the model. verbose : int A value greater than zero will save the optimization model as a .lp file A value greater than 1 will print out status messages Returns ------- Optimum : NamedTuple exit_code exit_message amounts (None, if no solution found) metrics (None, if no solution found) solve_time opt_sense """ import time time0 = time.time() # If no optimization sense is provided, use the default value from self. # If an optimization sense IS provided, overwrite the default value with # the provided value. # _sense is the parameter used only in this method if sense is None: print('opt_milp: No optimization sense provided; Maximizing objective') sense = 'max' else: if sense not in self.valid_sense: raise ValueError(f'opt_milp: sense must be one of {self.valid_sense}') _start = time.time() # investment categories _categories = self.evaluator.categories # metric to optimize _obj_metric = metric # if custom upper limits on investment amounts by category have not been # defined, get the upper limits from self.evaluator if max_amount is None: max_amount = self.evaluator.max_amount.Amount if verbose > 1: print(f'Getting and processing wide data at %s s' % str(round(time.time() - _start, 1))) # get data frame of elicited metric values by investment level combinations _wide = self.evaluator.evaluate_corners_wide(statistic).reset_index() _nbinary = len(_wide) * (len(_wide)-1) / 2 if _nbinary >= sizelimit: print(f'MILP contains {_nbinary} binary variables and will exit without solving') return None # (combinations of) Investment levels inv_levels = _wide.loc[:, _categories].values.tolist() # Elicited metric values - for objective function m = _wide.loc[:, _obj_metric].values.tolist() # all metric values - for calculating optimal metrics _metric_data = _wide.copy().drop(columns=_categories).values.tolist() # list of metrics _all_metrics = _wide.copy().drop(columns=_categories).columns.values # Number of investment level combinations/metric values I = len(inv_levels) if verbose > 1: print('Data processed at %s s' % str(round(time.time() - _start, 1))) if verbose > 1: print('Building MIP model at %s s' % str(round(time.time() - _start, 1))) # instantiate MILP model if sense == 'max': _model = Model(sense=MAXIMIZE) else: _model = Model(sense=MINIMIZE) bin_vars = [] lmbd_vars = [] #%% time0 = time.time() if verbose > 1: print('Creating %i lambda variables at %s s' % (I, str(round(time.time() - _start, 1)))) # create continuous lambda variables for i in range(I): #lmbd_vars += [_model.add_var(name='lmbd_' + str(i), lb=0.0, ub=1.0)] lmbd_vars.append(_model.add_var(name='lmbd_' + str(i), lb=0.0, ub=1.0)) if verbose > 1: print('Creating %i binary variables and constraints at %s s' % (_nbinary, str(round(time.time() - _start, 1)))) # create binary variables and binary/lambda variable constraints bin_count = 0 for i in range(I): for j in range(i, I): if j != i: # add binary variable bin_vars.append(_model.add_var(name='y_' + str(i) + '_' + str(j), var_type=BINARY)) # add binary/lambda variable constraint _model += bin_vars[bin_count] <= lmbd_vars[i] + lmbd_vars[j],\ 'Interval_Constraint_' + str(i) + '_' + str(j) bin_count += 1 if verbose > 1: print('Creating total budget constraint at %s s' % str(round(time.time() - _start, 1))) # total budget constraint - only if total_amount is an input if total_amount is not None: _model += xsum(lmbd_vars[i] * inv_levels[i][j] for i in range(I) for j in range(len(inv_levels[i]))) <= total_amount,\ 'Total_Budget' if verbose > 1: print('Creating category budget constraints at %s s' % str(round(time.time() - _start, 1))) # constraint on budget for each investment category # this is either fed in as an argument or pulled from evaluator for j in range(len(_categories)): _model += xsum(lmbd_vars[i] * [el[j] for el in inv_levels][i] for i in range(I)) <= max_amount[j],\ 'Budget_for_' + _categories[j].replace(' ', '') if verbose > 1: print('Defining metric constraints at %s s' % str(round(time.time() - _start, 1))) # define metric constraints if lower limits on metrics have been defined if eps_metric is not None: # loop through list of metric minima for index, info in eps_metric.items(): if info['sense'] == 'upper': _eps_mult = -1.0 elif info['sense'] == 'lower': _eps_mult = 1.0 else: raise ValueError('opt_milp: Epsilon constraint must be upper or lower') # add metric constraint on the lambda variables _model += _eps_mult * xsum(lmbd_vars[i] * _wide.loc[:,index].values.tolist()[i] for i in range(I)) >= info['limit'],\ 'Eps_Const_' + index if verbose > 1: print('Defining lambda convexity constraints at %s s' % str(round(time.time() - _start, 1))) time0 = time.time() lmbd_vars = np.asarray(lmbd_vars) # convexity constraint for continuous variables _model += np.sum(lmbd_vars) == 1, 'Lambda_Sum' if verbose > 1: print('Defining binary convexity constraints at %s s' % str(round(time.time() - _start, 1))) # constrain binary variables such that only one interval can be active # at a time bin_var = np.asarray(bin_vars) _model += bin_var.sum() == 1, 'Binary_Sum' if verbose > 1: print('Defining objective function at %s s' % str(round(time.time() - _start, 1))) # objective function _model.objective = xsum(m[i] * lmbd_vars[i] for i in range(I)) # save a copy of the model in LP format if verbose > 0: print('Saving model at %s s' % str(round(time.time() - _start, 1))) _model.write('model.lp') else: # if the verbose parameter is 0, the MIP solver does not print output _model.verbose = 0 if verbose > 1: print('Optimizing at %s s' % str(round(time.time() - _start, 1))) _opt_start = time.time() # find optimal solution _solution = _model.optimize() elapsed = time.time() - _opt_start # if a feasible solution was found, calculate the optimal investment values # and return a populated Optimum tuple if _model.status.value == 0: # get the optimal variable values as two lists if verbose > 1: print('Optimized at %s s' % str(round(time.time() - _start, 1))) lmbd_opt = [] y_opt = [] for v in _model.vars: if 'lmbd' in v.name: lmbd_opt += [v.x] elif 'y' in v.name: y_opt += [v.x] inv_levels_opt = [] if verbose > 1: print('Calculating optimal investment values at %s s' % str(round(time.time() - _start, 1))) # calculate the optimal investment values for i in range(len(_categories)): inv_levels_opt += [sum([lmbd_opt[j] * [el[i] for el in inv_levels][j] for j in range(len(lmbd_opt))])] # construct a Series of optimal investment levels x = pd.Series(inv_levels_opt, name="Amount", index=self.evaluator.max_amount.index) y = pd.Series(None, name="Value", index=_all_metrics, dtype=float) if verbose > 1: print('Calculating optimal metric values at %s s' % str(round(time.time() - _start, 1))) # calculate optimal values of all metrics for j in range(len(_all_metrics)): y[j] = sum( [lmbd_opt[i] * _metric_data[i][j] for i in range(len(_metric_data))] ) if verbose > 1: print('Optimal metric values calculated at %s s' % str(round(time.time() - _start, 1))) return Optimum( exit_code=_model.status.value, exit_message=_model.status, amounts=x, metrics=y, solve_time=elapsed, opt_sense = sense ) # if no feasible solution was found, return a partially empty Optimum tuple else: return Optimum( exit_code=_model.status.value, exit_message=_model.status, amounts=None, metrics=None, solve_time=elapsed, opt_sense=sense )