Source code for python.data_functions

"""
Functions for pulling data
"""
import psycopg2 as pg
import time
import numpy as np
import pandas as pd
import datetime
from datetime import datetime
from multiprocessing import Process, JoinableQueue
from io import StringIO
import gzip
import subprocess
import os
import psutil
import decorators
import utility_functions as utilfunc
import shutil
import glob
import pickle
import sys
import logging
import imp
imp.reload(logging)
import sqlalchemy
import json

#==============================================================================
# Load logger
logger = utilfunc.get_logger()
#==============================================================================


#==============================================================================
# configure psycopg2 to treat numeric values as floats (improves
# performance of pulling data from the database)
DEC2FLOAT = pg.extensions.new_type(
    pg.extensions.DECIMAL.values,
    'DEC2FLOAT',
    lambda value, curs: float(value) if value is not None else None)
pg.extensions.register_type(DEC2FLOAT)
#==============================================================================


[docs]def create_tech_subfolders(out_scen_path, techs, out_subfolders): """ Creates subfolders for results of each specified technology Parameters ---------- out_scen_path : 'directory' Path for the scenario folder to send results techs : 'string' Technology type out_subfolders : 'dict' Dictionary of empty subfolder paths for solar Returns ------- out_subfolders : 'dict' Dictionary with subfolder paths for solar """ for tech in techs: # set output subfolders out_tech_path = os.path.join(out_scen_path, tech) os.makedirs(out_tech_path) out_subfolders[tech].append(out_tech_path) return out_subfolders
[docs]def create_scenario_results_folder(input_scenario, scen_name, scenario_names, out_dir, dup_n=0): """ Creates scenario results directories Parameters ---------- input_scenario : 'directory' Scenario inputs pulled from excel file within diffusion/inputs_scenarios folder scen_name : 'string' Scenario Name scenario_names : 'list' List of scenario names out_dir : 'directory' Output directory for scenario subfolders dup_n : 'int' Number to track duplicate scenarios in scenario_names. Default is 0 unless otherwise specified. Returns ------- out_scen_path : 'directory' Path for the scenario subfolders to send results scenario_names Populated list of scenario names dup_n : 'int' Number to track duplicate scenarios, stepped up by 1 from original value if there is a duplicate """ if scen_name in scenario_names: logger.info("Warning: Scenario name {0} is a duplicate. Renaming to {1}_{2}".format(( scen_name, scen_name, dup_n))) scen_name = "{0}_{1}".format((scen_name, dup_n)) dup_n += 1 scenario_names.append(scen_name) out_scen_path = os.path.join(out_dir, scen_name) os.makedirs(out_scen_path) # copy the input scenario spreadsheet if input_scenario is not None: shutil.copy(input_scenario, out_scen_path) return out_scen_path, scenario_names, dup_n
[docs]@decorators.fn_timer(logger=logger, tab_level=1, prefix='') def create_output_schema(pg_conn_string, role, suffix, scenario_list, source_schema='diffusion_template', include_data=True): """ Creates output schema that will be dropped into the database Parameters ---------- pg_conn_string : 'string' String to connect to pgAdmin database role : 'string' Owner of schema suffix : 'string' String to mark the time that model is kicked off. Added to end of schema to act as a unique indentifier source_schema : 'SQL schema' Schema to be used as template for the output schema include_data : 'bool' If True includes data from diffusion_shared schema. Default is False Returns ------- dest_schema : 'SQL schema' Output schema that will house the final results """ inputs = locals().copy() suffix = utilfunc.get_formatted_time() suffix_microsecond = datetime.now().strftime('%f') logger.info('Creating output schema based on {source_schema}'.format(**inputs)) con, cur = utilfunc.make_con(pg_conn_string, role="postgres") #con, cur = utilfunc.make_con(pg_conn_string, role="diffusion-schema-writers") # check that the source schema exists sql = """SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = '{source_schema}';""".format(**inputs) check = pd.read_sql(sql, con) if check['count'][0] != 1: msg = "Specified source_schema ({source_schema}) does not exist.".format(**inputs) raise ValueError(msg) scen_suffix = os.path.split(scenario_list[0])[1].split('_')[2].rstrip('.xlsm') dest_schema = 'diffusion_results_{}'.format(suffix+suffix_microsecond+'_'+scen_suffix) inputs['dest_schema'] = dest_schema sql = '''SELECT diffusion_shared.clone_schema('{source_schema}', '{dest_schema}', '{role}', {include_data});'''.format(**inputs) cur.execute(sql) con.commit() logger.info('\tOutput schema is: {}'.format(dest_schema)) return dest_schema
[docs]@decorators.fn_timer(logger=logger, tab_level=1, prefix='') def drop_output_schema(pg_conn_string, schema, delete_output_schema): """ Deletes output schema from database if set to true Parameters ---------- pg_conn_string : 'string' String to connect to pgAdmin database schema : 'SQL schema' Schema that will be deleted delete_output_schema : 'bool' If set to True in config.py, deletes output schema """ inputs = locals().copy() if delete_output_schema == True: logger.info('Dropping the Output Schema ({}) from Database'.format(schema)) con, cur = utilfunc.make_con(pg_conn_string, role="postgres") #con, cur = utilfunc.make_con(pg_conn_string, role="diffusion-schema-writers") sql = '''DROP SCHEMA IF EXISTS {schema} CASCADE;'''.format(**inputs) cur.execute(sql) con.commit() else: logger.warning( "The output schema (%(schema)s) has not been deleted. Please delete manually when you are finished analyzing outputs." % inputs)
[docs]def get_sectors(cur, schema): ''' Return the sectors to model from table view in postgres. Parameters ---------- cur : 'SQL cursor' Cursor schema : 'SQL schema' Schema in which the sectors exist Returns ------- sectors : 'dict' Dictionary of sectors to be modeled in table view in postgres ''' sql = '''SELECT sectors FROM {}.sectors_to_model;'''.format(schema) cur.execute(sql) sectors = cur.fetchone()['sectors'] return sectors
[docs]def get_technologies(con, schema): ''' Return the technologies to model from table view in postgres. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema in which the technologies exist Returns ------- techs : 'list' List of technologies to be modeled in table view in postgres ''' sql = """SELECT CASE WHEN run_tech = 'Solar + Storage' THEN 'solar'::text END AS tech FROM {}.input_main_scenario_options;""".format(schema) # get the data df = pd.read_sql(sql, con) # convert to a simple list techs = df.tech.tolist() if len(techs) == 0: raise ValueError( "No technologies were selected to be run in the input sheet.") return techs
[docs]def get_agent_file_scenario(con, schema): sql = """SELECT agent_file as agent_file_status FROM {}.input_main_scenario_options;""".format(schema) # get the data df = pd.read_sql(sql, con) # convert to a simple list agent_file_status = df.agent_file_status.iloc[0] if agent_file_status is None: raise ValueError( "No pre-generated pkl agent file was provided to be run in the input sheet.") return agent_file_status
[docs]def get_bass_params(con, schema): ''' Return the bass diffusion parameters to use in the model from table view in postgres. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema in which the sectors exist Returns ------- bass_df : 'pd.df' Pandas DataFrame of state abbreviation, p, q, teq_yr1 (time equivalency), sector abbreviation, and the technology. ''' inputs = locals().copy() sql = """SELECT state_abbr, p, q, teq_yr1, sector_abbr, tech FROM {schema}.input_solar_bass_params;""".format(**inputs) bass_df = pd.read_sql(sql, con, coerce_float=True) bass_df.rename(columns={'p':'bass_param_p', 'q':'bass_param_q'}, inplace=True) return bass_df
[docs]def get_state_incentives(con): ''' Return the state incentives to use in the model from table view in postgres. Parameters ---------- con : 'SQL connection' Connection Returns ------- state_incentives : 'pd.df' Pandas DataFrame of state financial incentives. ''' # changed from 2019 to 2020 sql = """SELECT * FROM diffusion_shared.state_incentives_2020;""" state_incentives = pd.read_sql(sql, con) return state_incentives
[docs]def get_itc_incentives(con, schema): ''' Return the Investment Tax Credit incentives to use in the model from table view in postgres. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema in which the sectors exist Returns ------- itc_options : 'pd.df' Pandas DataFrame of ITC financial incentives. ''' inputs = locals().copy() sql = """SELECT year, substring(lower(sector), 1, 3) as sector_abbr, itc_fraction, tech, min_size_kw, max_size_kw FROM {schema}.input_main_itc_options;""".format(**inputs) itc_options = pd.read_sql(sql, con) itc_options.rename(columns={'itc_fraction':'itc_fraction_of_capex'}, inplace=True) return itc_options
[docs]def get_max_market_share(con, schema): ''' Return the max market share from database, select curve based on scenario_options, and interpolate to tenth of a year. Use passed parameters to determine ownership typ. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema - string - for technology i.e. diffusion_solar Returns ------- max_market_share : 'pd.df' Pandas DataFrame to join on main df to determine max share keys are sector & payback period. ''' sql = '''SELECT metric_value, sector_abbr, max_market_share, metric, source, business_model FROM {0}.max_market_curves_to_model UNION ALL SELECT 30.1 as metric_value, sector_abbr, 0::NUMERIC as max_market_share, metric, source, business_model FROM {1}.max_market_curves_to_model WHERE metric_value = 30 AND metric = 'payback_period' AND business_model = 'host_owned';'''.format(schema, schema) max_market_share = pd.read_sql(sql, con) max_market_share.rename(columns={'metric_value':'payback_period'}, inplace=True) return max_market_share
[docs]def get_rate_escalations(con, schema): ''' Return rate escalation multipliers from database. Escalations are filtered and applied in calc_economics, resulting in an average real compounding rate growth. This rate is then used to calculate cash flows. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- rate_escalations : 'pd.df' Pandas DataFrame with county_id, sector, year, escalation_factor, and source as columns. ''' inputs = locals().copy() sql = """SELECT year, county_id, sector_abbr, nerc_region_abbr, escalation_factor as elec_price_multiplier FROM {schema}.rate_escalations_to_model ORDER BY year, county_id, sector_abbr""".format(**inputs) rate_escalations = pd.read_sql(sql, con, coerce_float = False) return rate_escalations
[docs]def get_load_growth(con, schema): ''' Return rate load growth values applied to electricity load. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- df : 'pd.df' Pandas DataFrame with year, county_id, sector_abbr, nerc_region_abbr, load_multiplier as columns. ''' inputs = locals().copy() sql = """SELECT year, county_id, sector_abbr, nerc_region_abbr, load_multiplier FROM {schema}.load_growth_to_model;""".format(**inputs) df = pd.read_sql(sql, con, coerce_float=False) return df
[docs]def get_technology_costs_solar(con, schema): ''' Return technology costs for solar. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- df : 'pd.df' Pandas DataFrame with year, sector_abbr, system_capex_per_kw, system_om_per_kw, system_variable_om_per_kw as columns. ''' inputs = locals().copy() sql = """SELECT year, sector_abbr, system_capex_per_kw, system_om_per_kw, system_variable_om_per_kw FROM {schema}.input_pv_prices_to_model;""".format(inputs) df = pd.read_sql(sql, con, coerce_float = False) return df
[docs]def get_annual_inflation(con, schema): ''' Return the inflation rate set in the input sheet. Constant for all years & sectors. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' diffusion_shared.input_main_market_inflation Returns ------- df.values[0][0] : 'float' Float object that represents the inflation rate (e.g. 0.025 which corresponds to 2.5%). ''' inputs = locals().copy() sql = '''SELECT * FROM diffusion_shared.input_main_market_inflation;'''.format(**inputs) df = pd.read_sql(sql, con) df.rename(columns={'inflation':'inflation_rate'}, inplace=True) return df.values[0][0] # Just want the inflation as a float (for now)
#%%
[docs]def make_output_directory_path(suffix): ''' Creates and returns a directory named 'results' with the timestamp associated with the model run appended. Note, this directory stores metadata associated with the a model run, however, the results of the model run are in the 'agent_outputs' table within the schema created with each run in the database. ''' out_dir = '{}/runs/results_{}'.format(os.path.dirname(os.getcwd()), suffix) return out_dir
[docs]def get_input_scenarios(): ''' Returns a list of the input scenario excel files specified in the input_scenarios directory. Returns ------- scenarios : 'list' a list of the input scenario excel files specified in the input_scenarios directory. ''' scenarios = [s for s in glob.glob( "../input_scenarios/*.xls*") if not '~$' in s] return scenarios
[docs]def create_model_years(start_year, end_year, increment=2): ''' Return a list of model years ranging between the specified model start year and end year that increments by 2 year time steps. Parameters ---------- start_year : 'int' starting year of the model (e.g. 2014) end_year : 'int' ending year of the model (e.g. 2050) Returns ------- model_years : 'list' list of model years ranging between the specified model start year and end year that increments by 2 year time steps. ''' model_years = list(range(start_year, end_year + 1, increment)) return model_years
[docs]def summarize_scenario(scenario_settings, model_settings): ''' Log high level secenario settings ''' # summarize high level secenario settings logger.info('Scenario Settings:') logger.info('\tScenario Name: {}'.format(scenario_settings.scen_name)) logger.info('\tRegion: {}'.format(scenario_settings.region)) logger.info('\tSectors: {}'.format(list(scenario_settings.sectors.values()))) logger.info('\tTechnologies: {}'.format(scenario_settings.techs)) logger.info('\tYears: {0} - {1}'.format(model_settings.start_year, scenario_settings.end_year)) return
#%%
[docs]def get_scenario_options(cur, schema, pg_params): ''' Pull scenario options and log the user running the scenario from dB ''' inputs = locals().copy() inputs['user'] = str(pg_params.get("user")) # log username to identify the user running the particular scenario sql = '''ALTER TABLE {schema}.input_main_scenario_options ADD COLUMN scenario_user text; UPDATE {schema}.input_main_scenario_options SET scenario_user = '{user}' WHERE scenario_name IS NOT NULL'''.format(**inputs) cur.execute(sql) sql = '''SELECT * FROM {schema}.input_main_scenario_options;'''.format(**inputs) cur.execute(sql) results = cur.fetchall()[0] return results
#%%
[docs]def get_nem_state(con, schema): ''' Returns net metering data for states with available data. Note, many states don't have net metering and or the data in diffusion_shared.nem_state_limits_2019 may be incomplete or out of date. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- df : 'pd.df' Pandas DataFrame with net metering data. ''' sql = "SELECT *, 'BAU'::text as scenario FROM diffusion_shared.nem_state_limits_2019;" df = pd.read_sql(sql, con, coerce_float=False) return df
[docs]def get_nem_state_by_sector(con, schema): ''' Returns net metering data for states by sector with available data. Note, many states don't have net metering and or the data in diffusion_shared.nem_scenario_bau_2019 may be incomplete or out of date. Special handling of DC: System size is unknown until bill calculator runs and differing compensation styles can potentially result in different optimal system sizes. Here we assume only res customers (assumed system_size_kw < 100) are eligible for full retail net metering; com/ind (assumed system_size_kw >= 100) only eligible for net billing. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- df : 'pd.df' Pandas DataFrame with net metering data. ''' sql = "SELECT *, 'BAU'::text as scenario FROM diffusion_shared.nem_scenario_bau_2019;" df = pd.read_sql(sql, con, coerce_float=False) # special handling of DC: we don't know system size until bill calculator and differing compensation styles will # potentially result in different optimal system sizes. Here we assume only res customers (assumed system_size_kw < 100) # are eligible for full retail net metering; com/ind (assumed system_size_kw >= 100) only eligible for net billing. df = df[~((df['state_abbr'] == 'DC') & (df['sector_abbr'] == 'res') & (df['compensation_style'] == 'net billing'))] df = df[~((df['state_abbr'] == 'DC') & (df['sector_abbr'] != 'res') & (df['compensation_style'] == 'net metering'))] df['min_pv_kw_limit'] = np.where(((df['state_abbr'] == 'DC') & (df['sector_abbr'] != 'res')), 0., df['min_pv_kw_limit']) df.rename(columns={'max_pv_kw_limit':'nem_system_kw_limit'}, inplace=True) return df
[docs]def get_nem_utility_by_sector(con, schema): ''' Returns net metering data for utility by sector with available data. Note, many utilities don't have net metering and or the data in diffusion_shared.nem_scenario_bau_by_utility_2019 may be incomplete or out of date. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- df : 'pd.df' Pandas DataFrame with net metering data. ''' sql = "SELECT *, 'BAU'::text as scenario FROM diffusion_shared.nem_scenario_bau_by_utility_2019;" df = pd.read_sql(sql, con, coerce_float=False) df.rename(columns={'max_pv_kw_limit':'nem_system_kw_limit'}, inplace=True) return df
[docs]def get_selected_scenario(con, schema): ''' Returns net metering scenario selected in the input sheet. Note, net metering data and or scenarios may be incomplete or out of date. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- df : 'pd.df' Pandas DataFrame with net metering data. ''' sql = "SELECT * FROM diffusion_shared.input_main_nem_selected_scenario;".format(schema) df = pd.read_sql(sql, con, coerce_float=False) value = df['val'][0] return value
[docs]def get_state_to_model(con, schema): ''' Returns the region to model as specified in the input sheet. Note, selecting an ISO will select the proper geographies (counties and or states) in import_agent_file() in 'input_data_functions.py'. Selecting the United States (national run) will result in every state, excluding Alaska and Hawaii, but including D.C., being returned as a list. Parameters ---------- con : 'SQL connection' Connection schema : 'SQL schema' Schema produced when model is run Returns ------- state_to_model : 'list' List of states to model. ''' sql = "SELECT * FROM {}.states_to_model;".format(schema) df = pd.read_sql(sql, con, coerce_float=False) state_to_model = df.state_abbr.tolist() return state_to_model