import os
import warnings
import json
import pandas as pd
import utility_functions as utilfunc
import multiprocessing
import sys
import psycopg2.extras as pgx
import config
import data_functions as datfunc
from excel import excel_functions
#==============================================================================
# Load logger
logger = utilfunc.get_logger()
#==============================================================================
[docs]class ModelSettings(object):
"""
Class containing the model settings parameters
Attributes
----------
model_init : float
cdate : str
out_dir : str
input_agent_dir : str
input_data_dir : str
start_year : int
input_scenarios : list
pg_params_file : str
role : str
pg_params : dict
pg_engine_params : dict
pg_conn_string : str
pg_engine_string : str
pg_params_log : str
model_path : bool
local_cores : int
pg_procs : int
delete_output_schema : bool
dynamic_system_sizing : bool
"""
def __init__(self):
self.model_init = None # type is float
self.cdate = None # type is text
self.out_dir = None # doesn't exist already, check parent folder exists
self.start_year = None # must = 2014
self.input_scenarios = None # type is list, is not empty
self.pg_params_file = None # path exists
self.pg_params = None # type is dict, includes all elements
self.pg_conn_string = None # type is text
self.pg_params_log = None # type is text, doesn't include pw
self.model_path = None # path exists
self.pg_procs = None # int<=16
self.local_cores = None # int < cores on machine
self.delete_output_schema = None # bool
self.dynamic_system_sizing = None # bool
[docs] def set(self, attr, value):
self.__setattr__(attr, value)
self.validate_property(attr)
[docs] def get(self, attr):
return self.__getattribute__(attr)
[docs] def add_config(self, config):
self.set('start_year', config.start_year)
self.set('model_path', config.model_path)
self.set('local_cores', config.local_cores)
self.set('pg_procs', config.pg_procs)
self.set_pg_params(config.pg_params_file)
self.set('delete_output_schema', config.delete_output_schema)
self.set('dynamic_system_sizing', config.dynamic_system_sizing)
[docs] def set_pg_params(self, pg_params_file):
# check that it exists
pg_params, pg_conn_string = utilfunc.get_pg_params(
os.path.join(self.model_path, pg_params_file))
pg_engine_params, pg_engine_string = utilfunc.get_pg_engine_params(
os.path.join(self.model_path, pg_params_file))
pg_params_log = json.dumps(json.loads(pd.DataFrame([pg_params])[
['host', 'port', 'dbname', 'user']].iloc[0].to_json()), indent=4, sort_keys=True)
self.set('pg_params_file', pg_params_file)
self.set('pg_params', pg_params)
self.set('pg_conn_string', pg_conn_string)
self.set('pg_params_log', pg_params_log)
self.set('pg_engine_params', pg_engine_params)
self.set('pg_engine_string', pg_engine_string)
[docs] def validate_property(self, property_name):
# for all properties -- check not null
if self.get(property_name) == None:
raise ValueError('{} has not been set'.format(property_name))
# validation for specific properties
if property_name == 'model_init':
# check type
try:
check_type(self.get(property_name), float)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'cdate':
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name in ['out_dir','input_agent_dir','input_data_dir']:
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'start_year':
# check type
try:
check_type(self.get(property_name), int)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# assert equals 2014
if self.start_year != 2014:
raise ValueError(
'Invalid {}: must be set to 2014'.format(property_name))
elif property_name == 'input_scenarios':
# check type
try:
check_type(self.get(property_name), list)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
if len(self.input_scenarios) == 0:
raise ValueError(
"Invalid {}: No input scenario spreadsheet were found in the input_scenarios folder.".format(property_name))
elif property_name == 'pg_params_file':
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check the path exists
if os.path.exists(self.pg_params_file) == False:
raise ValueError('Invalid {}: does not exist'.format(property_name))
elif property_name == 'role':
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name in ['pg_params','pg_engine_params']:
# check type
try:
check_type(self.get(property_name), dict)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check for all values
required_keys = set(['dbname',
'host',
'port',
'password',
'user'])
if set(self.pg_params.keys()).issubset(required_keys) == False:
raise ValueError('Invalid {0}: missing required keys ({1})'.format(
property_name, required_keys))
elif property_name in ['pg_conn_string','pg_engine_string']:
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'pg_params_log':
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check password is not included
if 'password' in self.pg_params_log:
raise ValueError(
'Invalid {}: password shoud not be included'.format(property_name))
elif property_name == 'model_path':
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check the path exists
if os.path.exists(self.model_path) == False:
raise ValueError('Invalid {}: does not exist'.format(property_name))
elif property_name == 'local_cores':
# check type
try:
check_type(self.get(property_name), int)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check if too large
if self.local_cores > multiprocessing.cpu_count():
raise ValueError(
'Invalid {}: value exceeds number of CPUs on local machine'.format(property_name))
elif property_name == 'pg_procs':
# check type
try:
check_type(self.get(property_name), int)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# warn if too large
if self.pg_procs > 16:
warnings.warn(
"High {}: may saturate the resources of the Postgres server".format(property_name))
elif property_name == 'delete_output_schema':
# check type
try:
check_type(self.get(property_name), bool)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'dynamic_system_sizing':
# check type
try:
check_type(self.get(property_name), bool)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
else:
print('No validation method for property {} exists.'.format(property_name))
[docs] def validate(self):
property_names = list(self.__dict__.keys())
for property_name in property_names:
self.validate_property(property_name)
return
#%%
[docs]class ScenarioSettings(object):
"""Storage of all scenario specific inputs"""
def __init__(self):
self.scen_name = None # type is text, no spaces?
self.end_year = None
self.region = None
self.load_growth = None # valid options only
self.random_generator_seed = None # int
self.sectors = None # valid options only
self.techs = None # valid options only
self.input_scenario = None # exists on disk
self.schema = None # string
self.agent_file_status = None # valid options onl
self.model_years = None # starts at 2014 and ends <= 2050
self.tech_mode = None # valid options only
self.state_to_model = None # valid state
[docs] def set(self, attr, value):
self.__setattr__(attr, value)
self.validate_property(attr)
[docs] def get(self, attr):
return self.__getattribute__(attr)
[docs] def add_scenario_options(self, scenario_options):
self.set('scen_name', scenario_options['scenario_name'])
self.set('end_year', scenario_options['end_year'])
self.set('region', scenario_options['region'])
self.set('load_growth', scenario_options[
'load_growth'])
self.set('random_generator_seed', scenario_options[
'random_generator_seed'])
[docs] def set_tech_mode(self):
if sorted(self.techs) in [['wind'], ['solar']]:
self.set('tech_mode', 'elec')
elif self.techs == ['du']:
self.set('tech_mode', 'du')
elif self.techs == ['ghp']:
self.set('tech_mode', 'ghp')
[docs] def validate_property(self, property_name):
# check not null
if self.get(property_name) == None:
raise ValueError('{} has not been set'.format(property_name))
if property_name == 'scen_name':
# check type
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# confirm no spaces
if ' ' in self.scen_name:
raise ValueError(
'Invalid {}: cannot contain spaces.'.format(property_name))
elif property_name == 'end_year':
try:
check_type(self.get(property_name), int)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# max of 2050
if self.end_year > 2050:
raise ValueError(
'Invalid {}: end_year must be <= 2050'.format(property_name))
elif property_name == 'region':
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'load_growth':
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'random_generator_seed':
try:
check_type(self.get(property_name), int)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'sectors':
try:
check_type(self.get(property_name), dict)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check all values are valid
valid_sectors = set(
[('res', 'Residential'),
('com', 'Commercial'),
('ind', 'Industrial')]
)
if set(self.sectors.items()).issubset(valid_sectors) == False:
raise ValueError(
'Invalid: the only allowable sectors are res, com, ind.')
# if only ind was selected and tehcmode is ghp or du, do not run
if list(self.sectors.keys()) == ['ind'] and self.tech_mode in ('ghp', 'du'):
raise ValueError('Invalid {0}: Cannot run industrial sector for {1}'.format(property_name, self.tech_mode))
warnings.warn(
'Industrial sector cannot be modeled for geothermal technologies at this time.')
# drop 'ind' sector if selected for geo
if list(self.sectors.keys()) != ['ind'] and 'ind' in list(self.sectors.keys()) and self.tech_mode in ('ghp', 'du'):
self.sectors.pop('ind')
warnings.warn(
'Industrial sector cannot be modeled for geothermal technologies at this time and will be ignored.')
elif property_name == 'techs':
try:
check_type(self.get(property_name), list)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
valid_options = [
['wind'],
['solar'],
['du'],
['ghp']
]
if sorted(self.techs) not in valid_options:
raise ValueError("Invalid {0}: Cannot currently run that combination of technologies. Valid options are: {1}".format(
property_name, valid_options))
elif property_name == 'agent_file_status':
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'state_to_model':
try:
check_type(self.get(property_name), list)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'input_scenario':
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check the path exists
if os.path.exists(self.input_scenario) == False:
raise ValueError('Invalid {}: does not exist'.format(property_name))
elif property_name == 'schema':
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
elif property_name == 'model_years':
try:
check_type(self.get(property_name), list)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# sort ascending
self.model_years.sort()
# make sure starts at 2014
if self.model_years[0] != 2014:
raise ValueError(
'Invalid {}: Must begin with 2014.'.format(property_name))
# last year must be <= 2050
if self.model_years[-1] > 2050:
raise ValueError(
'Invalid {}: End year must be <= 2050.'.format(property_name))
elif property_name == 'tech_mode':
try:
check_type(self.get(property_name), str)
except TypeError as e:
raise TypeError('Invalid {0}: {1}'.format(property_name, e))
# check valid options
valid_options = ['elec',
'ghp',
'du']
if self.tech_mode not in valid_options:
raise ValueError('Invalid {0}: must be one of {1}'.format(property_name, valid_options))
else:
print('No validation method for property {} exists.'.format(property_name))
[docs] def validate(self):
property_names = list(self.__dict__.keys())
for property_name in property_names:
self.validate_property(property_name)
return
[docs]def check_type(obj, expected_type):
if isinstance(obj, expected_type) == False:
raise TypeError('object type ({0}) does not match expected type ({1})'.format(type(obj), expected_type))
[docs]def init_model_settings():
"""initialize Model Settings object (this controls settings that apply to all scenarios to be executed)"""
# initialize Model Settings object (this controls settings that apply to
# all scenarios to be executed)
model_settings = ModelSettings()
# add the config to model settings; set model starting time, output directory based on run time, etc.
model_settings.add_config(config)
model_settings.set('model_init', utilfunc.get_epoch_time())
model_settings.set('role', 'postgres')
model_settings.set('cdate', utilfunc.get_formatted_time())
model_settings.set('out_dir', datfunc.make_output_directory_path(model_settings.cdate))
model_settings.set('input_data_dir', '{}/input_data'.format(os.path.dirname(os.getcwd())))
model_settings.set('input_agent_dir', '{}/input_agents'.format(os.path.dirname(os.getcwd())))
model_settings.set('input_scenarios', datfunc.get_input_scenarios())
# validate all model settings
model_settings.validate()
return model_settings
[docs]def init_scenario_settings(scenario_file, model_settings, con, cur):
"""load scenario specific data and configure output settings"""
scenario_settings = ScenarioSettings()
scenario_settings.set('input_scenario', scenario_file)
logger.info("-------------Preparing Database-------------")
# =========================================================================
# DEFINE SCENARIO SETTINGS
# =========================================================================
try:
# create an empty schema from diffusion_template
new_schema = datfunc.create_output_schema(model_settings.pg_conn_string, model_settings.role, model_settings.cdate, model_settings.input_scenarios, source_schema = 'diffusion_template', include_data = False)
except Exception as e:
raise Exception('\tCreation of output schema failed with the following error: {}'.format(e))
# set the schema
scenario_settings.set('schema', new_schema)
# load Input Scenario to the new schema
try:
excel_functions.load_scenario(scenario_settings.input_scenario, scenario_settings.schema, con, cur)
except Exception as e:
raise Exception('\tLoading failed with the following error: {}'.format(e))
# read in high level scenario settings
scenario_settings.set('techs', datfunc.get_technologies(con, scenario_settings.schema))
# read in settings whether to use pre-generated agent file ('User Defined'- provide pkl file name) or generate new agents
scenario_settings.set('agent_file_status', datfunc.get_agent_file_scenario(con, scenario_settings.schema))
# Set scenario output dir
# set tech_mode
scenario_settings.set_tech_mode()
scenario_settings.set('sectors', datfunc.get_sectors(cur, scenario_settings.schema))
scenario_settings.add_scenario_options(datfunc.get_scenario_options(cur, scenario_settings.schema, model_settings.pg_params))
scenario_settings.set('model_years', datfunc.create_model_years(model_settings.start_year, scenario_settings.end_year))
scenario_settings.set('state_to_model', datfunc.get_state_to_model(con, scenario_settings.schema))
# validate scenario settings
scenario_settings.validate()
return scenario_settings