Source code for marmot.marmot_h5_formatter

# -*- coding: utf-8 -*-
"""Main formatting source code to format modelling results for plotting.

This code was orginally written to process PLEXOS HDF5 outputs to get them ready for plotting,
but has since been expanded to allow class additions to process results from any energy 
simulation model. 
Once the data is processed it is outputted as an intermediary HDF5 file format so that
it can be read into the marmot_plot_main.py file

@author: Daniel Levie
"""
# =======================================================================================
# Import Python Libraries
# =======================================================================================

import sys
import time
from pathlib import Path
from typing import Union

import h5py
import pandas as pd

try:
    import marmot.utils.mconfig as mconfig
except ModuleNotFoundError:
    from utils.definitions import INCORRECT_ENTRY_POINT

    print(INCORRECT_ENTRY_POINT.format(Path(__file__).name))
    sys.exit()
import marmot.formatters as formatters
import marmot.utils.dataio as dataio
from marmot.formatters.formatbase import Process
from marmot.utils.definitions import INPUT_DIR, PLEXOS_YEAR_WARNING
from marmot.utils.error_handler import PropertyNotFound
from marmot.utils.loggersetup import SetupLogger

# A bug in pandas requires this to be included,
# otherwise df.to_string truncates long strings. Fix available in Pandas 1.0
# but leaving here in case user version not up to date
pd.set_option("display.max_colwidth", 1000)

formatter_settings = mconfig.parser("formatter_settings")


[docs]class MarmotFormat(SetupLogger): """Main module class to be instantiated to run the formatter. MarmotFormat handles the passing on information to the various Process classes and handles the saving of formatted results. Once the outputs have been processed, they are saved to an intermediary hdf5 file which can then be read into the Marmot plotting code """ def __init__( self, Scenario_name: str, model_solutions_folder: Union[str, Path], properties_file: Union[str, Path, pd.DataFrame], marmot_solutions_folder: Union[str, Path] = None, region_mapping: Union[str, Path, pd.DataFrame] = pd.DataFrame(), emit_names_dict: Union[str, Path, pd.DataFrame, dict] = None, **kwargs, ): """ Args: Scenario_name (str): Name of scenario to process. model_solutions_folder (Union[str, Path]): Directory containing model simulation results subfolders and their files. properties_file (Union[str, Path, pd.DataFrame]): Path to or DataFrame of properties to process. marmot_solutions_folder (Union[str, Path], optional): Direcrory to save Marmot solution files. Defaults to None. region_mapping (Union[str, Path, pd.DataFrame], optional): Path to or Dataframe to map custom regions/zones to create custom aggregations. Aggregations are created by grouping PLEXOS regions. Defaults to pd.DataFrame(). emit_names_dict (Union[str, Path, pd.DataFrame, dict], optional): Path to, DataFrame or dict to rename emissions types. Defaults to None. **kwargs These parameters will be passed to the marmot.utils.loggersetup.SetupLogger class. """ super().__init__("formatter", **kwargs) # Instantiation of SetupLogger self.Scenario_name = Scenario_name self.model_solutions_folder = Path(model_solutions_folder) if marmot_solutions_folder is None: self.marmot_solutions_folder = self.model_solutions_folder else: self.marmot_solutions_folder = Path(marmot_solutions_folder) self.marmot_solutions_folder.mkdir(exist_ok=True) self.properties_file = properties_file self.region_mapping = region_mapping self.emit_names_dict = emit_names_dict @property def properties_file(self) -> pd.DataFrame: """DataFrame containing information on model properties to process. Returns: pd.DataFrame: """ return self._properties_file @properties_file.setter def properties_file(self, properties_file) -> None: if isinstance(properties_file, (str, Path)): try: self._properties_file = pd.read_csv(properties_file) except FileNotFoundError: msg = ( "Could not find specified properties_file csv file; " "check file name and path." ) self.logger.error(msg) raise FileNotFoundError(msg) elif isinstance(properties_file, pd.DataFrame): self._properties_file = properties_file else: msg = ( "Expected a DataFrame or a file path to csv for the properties_file input but " f"recieved a {type(properties_file)}" ) self.logger.error(msg) raise NotImplementedError(msg) @property def region_mapping(self) -> pd.DataFrame: """Region mapping Dataframe to map custom aggregations. Returns: pd.DataFrame: """ return self._region_mapping @region_mapping.setter def region_mapping(self, region_mapping) -> None: if isinstance(region_mapping, (str, Path)): try: region_mapping = pd.read_csv(region_mapping) except FileNotFoundError: msg = ( "Could not find specified region_mapping csv file; " "check file name and path." ) self.logger.error(msg) raise FileNotFoundError(msg) if isinstance(region_mapping, pd.DataFrame): self._region_mapping = region_mapping.astype(str) if "category" in region_mapping.columns: # delete category columns if exists self._region_mapping = self._region_mapping.drop(["category"], axis=1) else: msg = ( "Expected a DataFrame or a file path to csv for the region_mapping input but " f"recieved a {type(region_mapping)}" ) self.logger.error(msg) raise NotImplementedError(msg) @property def emit_names_dict(self) -> dict: """Dictionary of existing emissions names to new names. Returns: dict: Keys Existing names, Values: New names """ return self._emit_names_dict @emit_names_dict.setter def emit_names_dict(self, emit_names_dict) -> None: if isinstance(emit_names_dict, (str, Path)): try: emit_names_dict = pd.read_csv(emit_names_dict) except FileNotFoundError: msg = ( "Could not find specified emit_names dictionary csv file; " "check file name and path." ) self.logger.error(msg) raise FileNotFoundError(msg) if isinstance(emit_names_dict, pd.DataFrame): if len(emit_names_dict.axes[1]) == 2: self._emit_names_dict = ( emit_names_dict.set_index(emit_names_dict.columns[0]) .squeeze() .to_dict() ) else: msg = ( "Expected exactly 2 columns for emit_names_dict input, " f"{len(emit_names_dict.axes[1])} columns were in the DataFrame." ) self.logger.error(msg) raise ValueError(msg) elif isinstance(emit_names_dict, dict): self._emit_names_dict = emit_names_dict elif emit_names_dict is None: self._emit_names_dict = {} else: msg = ( "Expected a DataFrame a dict or a file path to csv for the emit_names_dict input but " f"recieved a {type(emit_names_dict)}" ) self.logger.error(msg) raise NotImplementedError(msg)
[docs] def run_formatter( self, sim_model: str = "PLEXOS", plexos_block: str = "ST", append_block_name: bool = False, process_subset_years: list = None, ) -> None: """Main method to call to begin formatting simulation model results Args: sim_model (str, optional): Name of simulation model to process data for. Defaults to 'PLEXOS'. plexos_block (str, optional): PLEXOS results type. Defaults to 'ST'. append_block_name (bool, optional): Append block type to scenario name. Defaults to False. process_subset_years (list, optional): If provided only process years specified. (Only used for sim_model = ReEDS) Defaults to None. """ if append_block_name: scen_name = f"{self.Scenario_name} {plexos_block}" else: scen_name = self.Scenario_name process_class = getattr(formatters, sim_model.lower())() if not callable(process_class): self.logger.error( "A required module was not found to " f"process {sim_model} results" ) self.logger.error(process_class) raise ModuleNotFoundError( "A required module was not found to " f"process {sim_model} results" ) self.logger.info(f"#### Processing {scen_name} {sim_model} " "Results ####") hdf5_output_name = f"{scen_name}_formatted.h5" input_folder = self.model_solutions_folder.joinpath(str(self.Scenario_name)) output_folder = self.marmot_solutions_folder.joinpath("Processed_HDF5_folder") output_folder.mkdir(exist_ok=True) output_file_path = output_folder.joinpath(hdf5_output_name) process_sim_model: Process = process_class( input_folder, output_file_path, plexos_block=plexos_block, process_subset_years=process_subset_years, region_mapping=self.region_mapping, emit_names_dict=self.emit_names_dict, ) files_list = process_sim_model.get_input_data_paths # init of ExtraProperties class extraprops_init = process_sim_model.EXTRA_PROPERTIES_CLASS(process_sim_model) # ===================================================================== # Process the Outputs # ===================================================================== # Creates Initial HDF5 file for outputting formated data Processed_Data_Out = pd.DataFrame() if output_file_path.is_file(): self.logger.info( f"'{output_file_path}' already exists: New " "variables will be added\n" ) # Skip properties that already exist in *formatted.h5 file. with h5py.File(output_file_path, "r") as f: existing_keys = [key for key in f.keys()] # The processed HDF5 output file already exists. If metadata is already in # this file, leave as is. Otherwise, append it to the file. if "metadata" not in existing_keys: self.logger.info("Adding metadata to processed HDF5 file.") process_sim_model.output_metadata(files_list) if not formatter_settings["skip_existing_properties"]: existing_keys = [] # The processed HDF5 file does not exist. # Create the file and add metadata to it. else: existing_keys = [] # Create empty hdf5 file f = h5py.File(output_file_path, "w") f.close() process_sim_model.output_metadata(files_list) process_properties = self.properties_file.loc[ self.properties_file["collect_data"] == True ] start = time.time() # Main loop to process each output and pass data to functions for _, row in process_properties.iterrows(): Processed_Data_Out = pd.DataFrame() data_chunks = [] self.logger.info(f'Processing {row["group"]} {row["data_set"]}') prop_underscore = row["data_set"].replace(" ", "_") key_path = row["group"] + "_" + prop_underscore # Get name to save property as in formatted h5 file property_key_name = process_sim_model.PROPERTY_MAPPING.get( key_path, key_path ) if property_key_name not in existing_keys: for model in files_list: try: processed_data = process_sim_model.get_processed_data( row["group"], row["data_set"], row["data_type"], model ) except PropertyNotFound as e: self.logger.warning(e.message) data_chunks.append(pd.DataFrame()) break # Check if data is for year interval and of type capacity if ( row["data_type"] == "year" and sim_model == "PLEXOS" and ( (row["data_set"] == "Installed Capacity") | (row["data_set"] == "Export Limit") | (row["data_set"] == "Import Limit") ) ): data_chunks.append(processed_data) self.logger.info( f"{row['data_set']} Year property reported " "from only the first partition" ) break else: data_chunks.append(processed_data) # Combine models Processed_Data_Out = process_sim_model.combine_models(data_chunks) if Processed_Data_Out.empty is False: if row["data_type"] == "year" and sim_model == "PLEXOS": self.logger.info(PLEXOS_YEAR_WARNING) save_attempt = 1 while save_attempt <= 3: try: dataio.save_to_h5( Processed_Data_Out, output_file_path, key=property_key_name, ) save_attempt = 4 except OSError: self.logger.warning( "h5 File is probably in use, " "waiting to attempt to save again" ) time.sleep(60) save_attempt += 1 # Calculate any extra properties extra_prop_functions = extraprops_init.get_extra_properties( property_key_name ) if extra_prop_functions: for prop_function_tup in extra_prop_functions: prop_name, prop_function = prop_function_tup if ( prop_name not in h5py.File(output_file_path, "r") or not formatter_settings["skip_existing_properties"] ): self.logger.info(f"Processing {prop_name}") prop = prop_function( Processed_Data_Out, timescale=row["data_type"], ) if prop.empty is False: dataio.save_to_h5( prop, output_file_path, key=prop_name ) else: self.logger.warning(f"{prop_name} was not saved") continue # Run again to check for properties based of new properties extra2_prop_functions = ( extraprops_init.get_extra_properties(prop_name) ) if extra2_prop_functions: for prop_function_tup2 in extra2_prop_functions: prop_name2, prop_function2 = prop_function_tup2 if ( prop_name2 not in h5py.File(output_file_path, "r") or not formatter_settings[ "skip_existing_properties" ] ): self.logger.info(f"Processing {prop_name2}") prop2 = prop_function2( prop, timescale=row["data_type"], ) if prop2.empty is False: dataio.save_to_h5( prop2, output_file_path, key=prop_name2 ) else: self.logger.warning( f"{prop_name2} was not saved" ) else: continue else: self.logger.info(f"{key_path} already exists in output .h5 file.") self.logger.info("PROPERTY ALREADY PROCESSED\n") continue end = time.time() elapsed = end - start self.logger.info("Main loop took %s minutes", round(elapsed / 60, 2)) self.logger.info(f"Formatting COMPLETED for {scen_name}")
[docs]def main(): """Run the formatting code and format desired properties based on user input files.""" # =================================================================================== # Input Properties # =================================================================================== Marmot_user_defined_inputs = pd.read_csv( INPUT_DIR.joinpath(mconfig.parser("user_defined_inputs_file")), usecols=["Input", "User_defined_value"], index_col="Input", skipinitialspace=True, ) simulation_model = Marmot_user_defined_inputs.loc[ "Simulation_model", "User_defined_value" ].strip() if pd.isna( Marmot_user_defined_inputs.loc["PLEXOS_data_blocks", "User_defined_value"] ): plexos_data_blocks = ["ST"] else: plexos_data_blocks = Marmot_user_defined_inputs.loc[ "PLEXOS_data_blocks", "User_defined_value" ] plexos_data_blocks = [x.strip() for x in plexos_data_blocks.split(",")] # File which determiens which plexos properties to pull from the h5plexos results and # process, this file is in the repo properties_file = pd.read_csv( INPUT_DIR.joinpath( mconfig.parser(f"{simulation_model.lower()}_properties_file") ) ) # Name of the Scenario(s) being run, must have the same name(s) as the folder # holding the runs HDF5 file Scenario_List = Marmot_user_defined_inputs.loc[ "Scenario_process_list", "User_defined_value" ] Scenario_List = [x.strip() for x in Scenario_List.split(",")] # The folder that contains all the simulation model outputs - the files should # be contained in another folder with the Scenario_name model_solutions_folder = Marmot_user_defined_inputs.loc[ "Model_Solutions_folder", "User_defined_value" ].strip() # Folder to save your processed solutions if pd.isna( Marmot_user_defined_inputs.loc["Marmot_Solutions_folder", "User_defined_value"] ): marmot_solutions_folder = None else: marmot_solutions_folder = Marmot_user_defined_inputs.loc[ "Marmot_Solutions_folder", "User_defined_value" ].strip() # This folder contains all the csv required for mapping and selecting outputs # to process. Examples of these mapping files are within the Marmot repo, you # may need to alter these to fit your needs Mapping_folder = INPUT_DIR.joinpath("mapping_folder") if ( pd.isna( Marmot_user_defined_inputs.loc[ "Region_Mapping.csv_name", "User_defined_value" ] ) is True ): region_mapping = pd.DataFrame() else: region_mapping = Mapping_folder.joinpath( Marmot_user_defined_inputs.loc[ "Region_Mapping.csv_name", "User_defined_value" ] ) # Subset of years to process if pd.isna( Marmot_user_defined_inputs.loc["process_subset_years", "User_defined_value"] ): process_subset_years = None else: process_subset_years = Marmot_user_defined_inputs.loc[ "process_subset_years", "User_defined_value" ] # =================================================================================== # Standard Naming of Emissions types (optional) # =================================================================================== emit_names_dict = INPUT_DIR.joinpath( Mapping_folder, Marmot_user_defined_inputs.loc["emit_names.csv_name", "User_defined_value"], ) # =================================================================================== # Loop through scenarios in list # =================================================================================== for Scenario_name in Scenario_List: initiate = MarmotFormat( Scenario_name, model_solutions_folder, properties_file, marmot_solutions_folder=marmot_solutions_folder, region_mapping=region_mapping, emit_names_dict=emit_names_dict, ) if simulation_model == "PLEXOS": for block in plexos_data_blocks: initiate.run_formatter( plexos_block=block, append_block_name=formatter_settings["append_plexos_block_name"], ) else: initiate.run_formatter( sim_model=simulation_model, process_subset_years=process_subset_years )
if __name__ == "__main__": main()