import pandas as pd
import numpy as np
import os
import sqlalchemy
import data_functions as datfunc
import utility_functions as utilfunc
import agent_mutation
from agents import Agents, Solar_Agents
from pandas import DataFrame
import json
# Load logger
logger = utilfunc.get_logger()
#%%
[docs]def get_psql_table_fields(engine, schema, name):
"""
Creates numpy array of columns from specified schema and table
Parameters
----------
engine : 'SQL engine'
SQL engine to intepret SQL query
schema : 'SQL schema'
SQL schema to pull table from
name : 'string'
Name of the table from which fields are retrieved
Returns
-------
numpy array : 'np.array'
Numpy array of columns
"""
sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = '{}' AND table_name = '{}'".format(schema, name)
return np.concatenate(pd.read_sql_query(sql, engine).values)
[docs]def df_to_psql(df, engine, schema, owner, name, if_exists='replace', append_transformations=False):
"""
Uploads dataframe to database
Parameters
----------
df : 'pd.df'
Dataframe to upload to database
engine : 'SQL table'
SQL engine to intepret SQL query
schema : 'SQL schema'
Schema in which to upload df
owner : 'string'
Owner of schema
name : 'string'
Name to be given to table that is uploaded
if_exists : 'replace or append'
If table exists and if if_exists set to replace, replaces table in database. If table exists and if if_exists set to append, appendss table in database.
append_transformations : 'bool'
IDK
Returns
-------
df : 'pd.df'
Dataframe that was uploaded to database
"""
d_types = {}
transform = {}
f_d_type = {}
sql_type = {}
delete_list = []
orig_fields = df.columns.values
df.columns = [i.lower() for i in orig_fields]
for f in df.columns:
df_filter = pd.notnull(df[f]).values
if sum(df_filter) > 0:
f_d_type[f] = type(df[f][df_filter].values[0]).__name__.lower()
if f_d_type[f][0:3].lower() == 'int':
sql_type[f] = 'INTEGER'
if f_d_type[f][0:5].lower() == 'float':
d_types[f] = sqlalchemy.types.NUMERIC
sql_type[f] = 'NUMERIC'
if f_d_type[f][0:3].lower() == 'str':
sql_type[f] = 'VARCHAR'
if f_d_type[f] == 'list':
d_types[f] = sqlalchemy.types.ARRAY(sqlalchemy.types.STRINGTYPE)
transform[f] = lambda x: json.dumps(x)
sql_type[f] = 'VARCHAR'
if f_d_type[f] == 'ndarray':
d_types[f] = sqlalchemy.types.ARRAY(sqlalchemy.types.STRINGTYPE)
transform[f] = lambda x: json.dumps(list(x))
sql_type[f] = 'VARCHAR'
if f_d_type[f] == 'dict':
d_types[f] = sqlalchemy.types.STRINGTYPE
transform[f] = lambda x: json.dumps(
dict([(k_v[0], list(k_v[1])) if (type(k_v[1]).__name__ == 'ndarray') else (k_v[0], k_v[1]) for k_v in list(x.items())]))
sql_type[f] = 'VARCHAR'
if f_d_type[f] == 'interval':
d_types[f] = sqlalchemy.types.STRINGTYPE
transform[f] = lambda x: str(x)
sql_type[f] = 'VARCHAR'
if f_d_type[f] == 'dataframe':
d_types[f] = sqlalchemy.types.STRINGTYPE
transform[f] = lambda x: x.to_json() if isinstance(x,DataFrame) else str(x)
sql_type[f] = 'VARCHAR'
else:
orig_fields = [i for i in orig_fields if i.lower()!=f]
delete_list.append(f)
df = df.drop(delete_list, axis=1)
for k, v in list(transform.items()):
if append_transformations:
df[k + "_" + f_d_type[k]] = df[k].apply(v)
sql_type[k + "_" + f_d_type[k]] = sql_type[k]
del df[k]
del sql_type[k]
else:
df[k] = df[k].apply(v)
conn = engine.connect()
if if_exists == 'append':
fields = [i.lower() for i in get_psql_table_fields(engine, schema, name)]
for f in list(set(df.columns.values) - set(fields)):
sql = "ALTER TABLE {}.{} ADD COLUMN {} {}".format(schema, name, f, sql_type[f])
conn.execute(sql)
df.to_sql(name, engine, schema=schema, index=False, dtype=d_types, if_exists=if_exists)
sql = 'ALTER TABLE {}."{}" OWNER to "{}"'.format(schema, name, owner)
conn.execute(sql)
conn.close()
engine.dispose()
df.columns = orig_fields
return df
#%%
[docs]def get_scenario_settings(schema, con):
"""
Creates dataframe of default scenario settings from input_main_scenario_options table
Parameters
----------
schema : 'SQL schema'
Schema in which to look for the scenario settings
con : 'SQL connection'
SQL connection to connect to database
Returns
-------
df : 'pd.df'
Dataframe of default scenario settings
"""
sql = "SELECT * FROM {}.input_main_scenario_options".format(schema)
df = pd.read_sql(sql, con)
return df
[docs]def get_userdefined_scenario_settings(schema, table_name, con):
"""
Creates dataframe of user created scenario settings
Parameters
----------
schema : 'SQL schema'
Schema in which to look for the scenario settings
con : 'SQL connection'
SQL connection to connect to database
Returns
-------
df : 'pd.df'
Dataframe of user created scenario settings
"""
sql = "SELECT * FROM {}.{}".format(schema, table_name)
df = pd.read_sql(sql, con)
return df
#%%
[docs]def import_table(scenario_settings, con, engine, role, input_name, csv_import_function=None):
"""
Imports table from csv given the name of the csv
Parameters
----------
scenario_settings : 'SQL schema'
Schema in which to look for the scenario settings
con : 'SQL connection'
SQL connection to connect to database
engine : 'SQL engine'
SQL engine to intepret SQL query
role : 'string'
Owner of schema
input_name : 'string'
Name of the csv file that should be imported
csv_import_function : 'function'
Specific function to import and munge csv
Returns
-------
df : 'pd.df'
Dataframe of the table that was imported
"""
schema = scenario_settings.schema
shared_schema = 'diffusion_shared'
input_data_dir = scenario_settings.input_data_dir
user_scenario_settings = get_scenario_settings(schema, con)
scenario_name = user_scenario_settings[input_name].values[0]
if scenario_name == 'User Defined':
userdefined_table_name = "input_" + input_name + "_user_defined"
scenario_userdefined_name = get_userdefined_scenario_settings(schema, userdefined_table_name, con)
scenario_userdefined_value = scenario_userdefined_name['val'].values[0]
df = pd.read_csv(os.path.join(input_data_dir, input_name, scenario_userdefined_value + '.csv'), index_col=False)
if csv_import_function is not None:
df = csv_import_function(df)
df_to_psql(df, engine, shared_schema, role, scenario_userdefined_value)
else:
if input_name == 'elec_prices':
df = datfunc.get_rate_escalations(con, scenario_settings.schema)
elif input_name == 'load_growth':
df = datfunc.get_load_growth(con, scenario_settings.schema)
elif input_name == 'pv_prices':
df = datfunc.get_technology_costs_solar(con, scenario_settings.schema)
return df
#%%
[docs]def stacked_sectors(df):
"""
Takes dataframe and sorts table fields by sector
Parameters
----------
df : 'pd.df'
Dataframe to be sorted by sector.
Returns
-------
output : 'pd.df'
Dataframe of the table that was imported and split by sector
"""
sectors = ['res', 'ind','com','nonres','all']
output = pd.DataFrame()
core_columns = [x for x in df.columns if x.split("_")[-1] not in sectors]
for sector in sectors:
if sector in set([i.split("_")[-1] for i in df.columns]):
sector_columns = [x for x in df.columns if x.split("_")[-1] == sector]
rename_fields = {k:"_".join(k.split("_")[0:-1]) for k in sector_columns}
temp = df.loc[:,core_columns + sector_columns]
temp = temp.rename(columns=rename_fields)
if sector =='nonres':
sector_list = ['com', 'ind']
elif sector=='all':
sector_list = ['com', 'ind','res']
else:
sector_list = [sector]
for s in sector_list:
temp['sector_abbr'] = s
output = pd.concat([output, temp], ignore_index=True, sort=False)
return output
#%%
[docs]def deprec_schedule(df):
"""
Takes depreciation schedule and sorts table fields by depreciation year
Parameters
----------
df : 'pd.df'
Dataframe to be sorted by sector.
Returns
-------
output : 'pd.df'
Dataframe of depreciation schedule sorted by year
"""
columns = ['1', '2', '3', '4', '5', '6']
df['deprec_sch']=df.apply(lambda x: [x.to_dict()[y] for y in columns], axis=1)
max_required_year = 2050
max_input_year = np.max(df['year'])
missing_years = np.arange(max_input_year + 1, max_required_year + 1, 1)
last_entry = df[df['year'] == max_input_year]
for year in missing_years:
last_entry['year'] = year
df = df.append(last_entry)
return df.loc[:,['year','sector_abbr','deprec_sch']]
#%%
[docs]def melt_year(parameter_name):
"""
Returns a function to melt dataframe's columns of years and parameter values to the row axis
Parameters
----------
parameter name : 'string'
Name of the parameter value in dataframe.
Returns
-------
function : 'function'
Function that melts years and parameter value to row axis
"""
def function(df):
"""
Unpivots years and values from columns of dataframe to rows for each state abbreviation
Parameters
----------
df : 'pd.df'
Dataframe to be unpivot.
Returns
-------
df_tidy : 'pd.df'
Dataframe with every other year and the parameter value for that year as rows for each state
"""
years = np.arange(2014, 2051, 2)
years = [str(year) for year in years]
df_tidy = pd.melt(df, id_vars='state_abbr', value_vars=years, var_name='year', value_name=parameter_name)
df_tidy['year'] = df_tidy['year'].astype(int)
return df_tidy
return function
#%%
[docs]def import_agent_file(scenario_settings, con, cur, engine, model_settings, agent_file_status, input_name):
"""
Generates new agents or uses pre-generated agents from provided .pkl file
Parameters
----------
scenario_settings : 'SQL schema'
Schema of the scenario settings
con : 'SQL connection'
SQL connection to connect to database
cur : 'SQL cursor'
Cursor
engine : 'SQL engine'
SQL engine to intepret SQL query
model_settings : 'object'
Model settings that apply to all scenarios
agent_file_status : 'attribute'
Attribute that describes whether to use pre-generated agent file or create new
input_name : 'string'
.Pkl file name substring of pre-generated agent table
Returns
-------
solar_agents : 'Class'
Instance of Agents class with either user pre-generated or new data
"""
schema = scenario_settings.schema
input_agent_dir = model_settings.input_agent_dir
state_to_model = scenario_settings.state_to_model
ISO_List = ['ERCOT', 'NEISO', 'NYISO', 'CAISO', 'PJM', 'MISO', 'SPP']
if agent_file_status == 'Use pre-generated Agents':
userdefined_table_name = "input_" + input_name + "_user_defined"
scenario_userdefined_name = get_userdefined_scenario_settings(schema, userdefined_table_name, con)
scenario_userdefined_value = scenario_userdefined_name['val'].values[0]
solar_agents_df = pd.read_pickle(os.path.join(input_agent_dir, scenario_userdefined_value+".pkl"))
if scenario_settings.region in ISO_List:
solar_agents_df = pd.read_pickle(os.path.join(input_agent_dir, scenario_userdefined_value+".pkl"))
else:
solar_agents_df = solar_agents_df[solar_agents_df['state_abbr'].isin(state_to_model)]
if solar_agents_df.empty:
raise ValueError('Region not present within pre-generated agent file - Edit Inputsheet')
solar_agents = Agents(solar_agents_df)
solar_agents.on_frame(agent_mutation.elec.reassign_agent_tariffs, con)
else:
raise ValueError('Generating agents is not supported at this time. Please select "Use pre-generated Agents" in the input sheet')
return solar_agents
#%%
[docs]def process_elec_price_trajectories(elec_price_traj):
"""
Returns the trajectory of the change in electricity prices over time with 2018 as the base year
Parameters
----------
elec_price_traj : 'pd.df'
Dataframe of electricity prices by year and ReEDS BA
Returns
-------
elec_price_change_traj : 'pd.df'
Dataframe of annual electricity price change factors from base year
"""
county_to_ba_lkup = pd.read_csv('county_to_ba_mapping.csv')
# For SS19, when using Retail Electricity Prices from ReEDS
base_year_prices = elec_price_traj[elec_price_traj['year']==2018]
base_year_prices.rename(columns={'elec_price_res':'res_base',
'elec_price_com':'com_base',
'elec_price_ind':'ind_base'}, inplace=True)
elec_price_change_traj = pd.merge(elec_price_traj, base_year_prices[['res_base', 'com_base', 'ind_base', 'ba']], on='ba')
elec_price_change_traj['elec_price_change_res'] = elec_price_change_traj['elec_price_res'] / elec_price_change_traj['res_base']
elec_price_change_traj['elec_price_change_com'] = elec_price_change_traj['elec_price_com'] / elec_price_change_traj['com_base']
elec_price_change_traj['elec_price_change_ind'] = elec_price_change_traj['elec_price_ind'] / elec_price_change_traj['ind_base']
# Melt by sector
res_df = pd.DataFrame(elec_price_change_traj['year'])
res_df = elec_price_change_traj[['year', 'elec_price_change_res', 'ba']]
res_df.rename(columns={'elec_price_change_res':'elec_price_multiplier'}, inplace=True)
res_df['sector_abbr'] = 'res'
com_df = pd.DataFrame(elec_price_change_traj['year'])
com_df = elec_price_change_traj[['year', 'elec_price_change_com', 'ba']]
com_df.rename(columns={'elec_price_change_com':'elec_price_multiplier'}, inplace=True)
com_df['sector_abbr'] = 'com'
ind_df = pd.DataFrame(elec_price_change_traj['year'])
ind_df = elec_price_change_traj[['year', 'elec_price_change_ind', 'ba']]
ind_df.rename(columns={'elec_price_change_ind':'elec_price_multiplier'}, inplace=True)
ind_df['sector_abbr'] = 'ind'
elec_price_change_traj = pd.concat([res_df, com_df, ind_df], ignore_index=True, sort=False)
elec_price_change_traj = pd.merge(county_to_ba_lkup, elec_price_change_traj, how='left', on=['ba'])
elec_price_change_traj.drop(['ba'], axis=1, inplace=True)
return elec_price_change_traj
#%%
[docs]def process_wholesale_elec_prices(wholesale_elec_price_traj):
"""
Returns the trajectory of the change in wholesale electricity prices over time
Parameters
----------
wholesale_elec_price_traj : 'pd.df'
Dataframe of wholesale electricity prices by year and ReEDS BA
Returns
-------
wholesale_elec_price_change_traj : 'pd.df'
Dataframe of annual electricity price change factors from base year
"""
county_to_ba_lkup = pd.read_csv('county_to_ba_mapping.csv')
years = np.arange(2014, 2051, 2)
years = [str(year) for year in years]
wholesale_elec_price_change_traj = pd.melt(wholesale_elec_price_traj, id_vars='ba', value_vars=years, var_name='year', value_name='wholesale_elec_price_dollars_per_kwh')
wholesale_elec_price_change_traj['year'] = wholesale_elec_price_change_traj['year'].astype(int)
wholesale_elec_price_change_traj = pd.merge(county_to_ba_lkup, wholesale_elec_price_change_traj, how='left', on=['ba'])
wholesale_elec_price_change_traj.drop(['ba'], axis=1, inplace=True)
return wholesale_elec_price_change_traj
#%%
[docs]def process_load_growth(load_growth):
"""
Returns the trajectory of the load growth rates over time relative to a base year of 2014
Parameters
----------
load_growth : 'pd.df'
Dataframe of annual load growth rates
Returns
-------
load_growth_change_traj : 'pd.df'
Dataframe of annual load growth rates relative to base year
"""
base_year_load_growth = load_growth[load_growth['year']==2014]
base_year_load_growth.rename(columns={'load_growth_res':'res_base',
'load_growth_com':'com_base',
'load_growth_ind':'ind_base'}, inplace=True)
load_growth_change_traj = pd.merge(load_growth, base_year_load_growth[['res_base', 'com_base', 'ind_base', 'census_division_abbr']], on='census_division_abbr')
load_growth_change_traj['load_growth_change_res'] = load_growth_change_traj['load_growth_res'] / load_growth_change_traj['res_base']
load_growth_change_traj['load_growth_change_com'] = load_growth_change_traj['load_growth_com'] / load_growth_change_traj['com_base']
load_growth_change_traj['load_growth_change_ind'] = load_growth_change_traj['load_growth_ind'] / load_growth_change_traj['ind_base']
# Melt by sector
res_df = pd.DataFrame(load_growth_change_traj['year'])
res_df = load_growth_change_traj[['year', 'load_growth_change_res', 'census_division_abbr']]
res_df.rename(columns={'load_growth_change_res':'load_multiplier'}, inplace=True)
res_df['sector_abbr'] = 'res'
com_df = pd.DataFrame(load_growth_change_traj['year'])
com_df = load_growth_change_traj[['year', 'load_growth_change_com', 'census_division_abbr']]
com_df.rename(columns={'load_growth_change_com':'load_multiplier'}, inplace=True)
com_df['sector_abbr'] = 'com'
ind_df = pd.DataFrame(load_growth_change_traj['year'])
ind_df = load_growth_change_traj[['year', 'load_growth_change_ind', 'census_division_abbr']]
ind_df.rename(columns={'load_growth_change_ind':'load_multiplier'}, inplace=True)
ind_df['sector_abbr'] = 'ind'
load_growth_change_traj = pd.concat([res_df, com_df, ind_df], ignore_index=True, sort=False)
return load_growth_change_traj