Source code for marmot.plottingmodules.transmission

# -*- coding: utf-8 -*-
"""System transmission plots.

This code creates transmission line and interface plots.

@author: Daniel Levie, Marty Schwarz
"""

import logging
import re
from pathlib import Path
from typing import List

import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.colors as mcolors
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import marmot.utils.mconfig as mconfig
from marmot.metamanagers.read_metadata import MetaData
from marmot.plottingmodules.plotutils.plot_data_helper import (
    GenCategories,
    PlotDataStoreAndProcessor,
    set_facet_col_row_dimensions,
    set_x_y_dimension,
)
from marmot.plottingmodules.plotutils.plot_exceptions import (
    DataSavedInModule,
    InputSheetError,
    MissingInputData,
    MissingMetaData,
    MissingZoneData,
    UnderDevelopment,
    UnsupportedAggregation,
)
from marmot.plottingmodules.plotutils.plot_library import PlotLibrary
from marmot.plottingmodules.plotutils.styles import ColorList
from marmot.plottingmodules.plotutils.timeseries_modifiers import (
    adjust_for_leapday,
    set_timestamp_date_range,
    sort_duration,
)

logger = logging.getLogger("plotter." + __name__)
plot_data_settings: dict = mconfig.parser("plot_data")
shift_leapday: bool = mconfig.parser("shift_leapday")
xdimension = mconfig.parser("figure_size", "xdimension")
ydimension = mconfig.parser("figure_size", "ydimension")


[docs]class Transmission(PlotDataStoreAndProcessor): """System transmission plots. The transmission.py module contains methods that are related to the transmission network. Transmission inherits from the PlotDataStoreAndProcessor class to assist in creating figures. """ def __init__( self, Zones: List[str], Scenarios: List[str], AGG_BY: str, ordered_gen: List[str], marmot_solutions_folder: Path, gen_categories: GenCategories = GenCategories(), scenario_diff: List[str] = None, ylabels: List[str] = None, xlabels: List[str] = None, custom_xticklabels: List[str] = None, color_list: list = ColorList().colors, region_mapping: pd.DataFrame = pd.DataFrame(), **kwargs, ): """ Args: Zones (List[str]): List of regions/zones to plot. Scenarios (List[str]): List of scenarios to plot. AGG_BY (str): Informs region type to aggregate by when creating plots. ordered_gen (List[str]): Ordered list of generator technologies to plot, order defines the generator technology position in stacked bar and area plots. marmot_solutions_folder (Path): Directory containing Marmot solution outputs. gen_categories (GenCategories): Instance of GenCategories class, groups generator technologies into defined categories. Deafults to GenCategories. scenario_diff (List[str], optional): 2 value list, used to compare 2 scenarios. Defaults to None. ylabels (List[str], optional): y-axis labels for facet plots. Defaults to None. xlabels (List[str], optional): x-axis labels for facet plots. Defaults to None. custom_xticklabels (List[str], optional): List of custom x labels to apply to barplots. Values will overwite existing ones. Defaults to None. color_list (list, optional): List of colors to apply to non-gen plots. Defaults to ColorList().colors. region_mapping (pd.DataFrame, optional): Mapping file to map custom regions/zones to create custom aggregations. Aggregations are created by grouping PLEXOS regions. Defaults to pd.DataFrame(). """ # Instantiation of PlotDataStoreAndProcessor super().__init__(AGG_BY, ordered_gen, marmot_solutions_folder, **kwargs) self.Zones = Zones self.Scenarios = Scenarios self.gen_categories = gen_categories if scenario_diff is None: self.scenario_diff = [""] else: self.scenario_diff = scenario_diff self.ylabels = ylabels self.xlabels = xlabels self.custom_xticklabels = custom_xticklabels self.color_list = color_list self.region_mapping = region_mapping self.meta = MetaData( self.processed_hdf5_folder, region_mapping=self.region_mapping )
[docs] def line_util(self, **kwargs): """Creates a timeseries line plot of transmission lineflow utilization for each region. Utilization is plotted between 0 and 1 on the y-axis. The plot will default to showing the 10 highest utilized lines. A Line category can also be passed instead, using the property field in the Marmot_plot_select.csv Each scenarios is plotted on a separate Facet plot. This methods calls _util() to create the figure. Returns: dict: Dictionary containing the created plot and its data table. """ outputs = self._util(**kwargs) return outputs
[docs] def line_hist(self, **kwargs): """Creates a histogram of transmission lineflow utilization for each region. Utilization is plotted between 0 and 1 on the x-axis, with # lines on the y-axis. Each bar is equal to a 0.05 utilization rate The plot will default to showing all lines. A Line category can also be passed instead using the property field in the Marmot_plot_select.csv Each scenarios is plotted on a separate Facet plot. This methods calls _util() and passes the hist=True argument to create the figure. Returns: dict: Dictionary containing the created plot and its data table. """ outputs = self._util(hist=True, **kwargs) return outputs
def _util( self, hist: bool = False, prop: str = None, start_date_range: str = None, end_date_range: str = None, **_, ): """Creates utilization plots, line plot and histograms This methods is called from line_util() and line_hist() Args: hist (bool, optional): If True creates a histogram of utilization. Defaults to False. prop (str, optional): Optional PLEXOS line category to display. Defaults to None. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: Dictionary containing the created plot and its data table. """ outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [ (True, "line_Flow", self.Scenarios), (True, "line_Import_Limit", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() # sets up x, y dimensions of plot ncols, nrows = set_facet_col_row_dimensions( self.xlabels, self.ylabels, facet=True, multi_scenario=self.Scenarios ) grid_size = ncols * nrows # Used to calculate any excess axis to delete plot_number = len(self.Scenarios) excess_axs = grid_size - plot_number for zone_input in self.Zones: logger.info(f"For all lines touching Zone = {zone_input}") mplt = PlotLibrary(nrows, ncols, sharey=True, squeeze=False, ravel_axs=True) fig, axs = mplt.get_figure() plt.subplots_adjust(wspace=0.1, hspace=0.25) # Get and process all line limits line_limits = self.get_line_interface_limits( ["line_Import_Limit"], ) data_table = [] for n, scenario in enumerate(self.Scenarios): logger.info(f"Scenario = {str(scenario)}") # gets correct metadata based on area aggregation if self.AGG_BY == "zone": zone_lines = self.meta.zone_lines(scenario) else: zone_lines = self.meta.region_lines(scenario) try: zone_lines = zone_lines.set_index([self.AGG_BY]) except: logger.warning("Column to Aggregate by is missing") continue try: zone_lines = zone_lines.xs(zone_input) zone_lines = zone_lines["line_name"].unique() except KeyError: logger.warning("No data to plot for scenario") outputs[zone_input] = MissingZoneData() continue flow = self["line_Flow"].get(scenario) # Limit to only lines touching to this zone flow = flow[flow.index.get_level_values("line_name").isin(zone_lines)] if pd.notna(start_date_range): flow = set_timestamp_date_range( flow, start_date_range, end_date_range ) if flow.empty is True: logger.warning("No data in selected Date Range") continue if shift_leapday: flow = adjust_for_leapday(flow) if "Scenario" in line_limits.index.names: limits = line_limits.xs(scenario, level="Scenario") else: limits = line_limits limits = limits.groupby("line_name").max().abs() # Filters on specifed line category if prop is not None. if prop: logger.info(f"Line category = {str(prop)}") try: flow = flow.xs(prop, level="category") except KeyError: logger.warning( f"Line category {prop}, Not found in data! " "All categories will be plotted." ) flow = pd.merge(flow, limits, on="line_name", how="left") flow["Util"] = ( flow["values"].abs() / flow["line_Import_Limit"] ).fillna(0) # If greater than 1 because exceeds flow limit, report as 1 flow["Util"][flow["Util"] > 1] = 1 annual_util = ( flow["Util"].groupby(["line_name"]).mean().rename(scenario) ) # top annual utilized lines top_utilization = annual_util.nlargest(10, keep="first") color_dict = dict(zip(self.Scenarios, self.color_list)) if hist is True: mplt.histogram(annual_util, color_dict, label=scenario, sub_pos=n) else: for line in top_utilization.index.get_level_values( level="line_name" ).unique(): duration_curve = ( flow.loc[line] .sort_values(by="Util", ascending=False) .reset_index(drop=True) ) mplt.lineplot(duration_curve, "Util", label=line, sub_pos=n) axs[n].set_ylim((0, 1.1)) data_table.append(annual_util) mplt.add_legend() # Remove extra axes mplt.remove_excess_axs(excess_axs, grid_size) # add facet labels mplt.add_facet_labels(xlabels=self.xlabels, ylabels=self.ylabels) if hist is True: if prop: prop_name = prop else: prop_name = "All Lines" plt.ylabel( "Number of lines", color="black", rotation="vertical", labelpad=30 ) plt.xlabel( f"Line Utilization: {prop_name}", color="black", rotation="horizontal", labelpad=30, ) else: if prop: prop_name = prop else: prop_name = "Top 10 Lines" plt.ylabel( f"Line Utilization: {prop_name}", color="black", rotation="vertical", labelpad=60, ) plt.xlabel( "Intervals", color="black", rotation="horizontal", labelpad=20 ) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) Data_Out = pd.concat(data_table) outputs[zone_input] = {"fig": fig, "data_table": Data_Out} return outputs # def int_flow_ind(self, figure_name: str = None, prop: str = None, # start_date_range: str = None, # end_date_range: str = None, **_): # """Creates a line plot of interchange flows and their import and export limits. # Each interchange is potted on a separate facet plot. # The plot includes every interchange that originates or ends in the aggregation zone. # This can be adjusted by passing a comma separated string of interchanges to the property input. # The code will create either a timeseries or duration curve depending on # if the word 'duration_curve' is in the figure_name. # To make a duration curve, ensure the word 'duration_curve' is found in the figure_name. # Args: # figure_name (str, optional): User defined figure output name. # Defaults to None. # prop (str, optional): Comma separated string of interchanges. # Defaults to None. # start_date_range (str, optional): Defines a start date at which to represent data from. # Defaults to None. # end_date_range (str, optional): Defines a end date at which to represent data to. # Defaults to None. # Returns: # dict: dictionary containing the created plot and its data table # """ # duration_curve=False # if 'duration_curve' in figure_name: # duration_curve = True # # List of properties needed by the plot, properties are a set of tuples and # # contain 3 parts: required True/False, property name and scenarios required, # # scenarios must be a list. # properties = [(True,"interface_Flow",self.Scenarios), # (True,"interface_Import_Limit",self.Scenarios), # (True,"interface_Export_Limit",self.Scenarios)] # # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # # with all required properties, returns a 1 if required data is missing # check_input_data = self.get_formatted_data(properties) # if 1 in check_input_data: # return MissingInputData() # scenario = self.Scenarios[0] # outputs : dict = {} # if pd.notna(start_date_range): # logger.info(f"Plotting specific date range: \ # {str(start_date_range)} to {str(end_date_range)}") # for zone_input in self.Zones: # logger.info(f"For all interfaces touching Zone = {zone_input}") # data_table_out = pd.DataFrame() # # gets correct metadata based on area aggregation # if self.AGG_BY=='zone': # zone_lines = self.meta.zone_lines(scenario) # else: # zone_lines = self.meta.region_lines(scenario) # try: # zone_lines = zone_lines.set_index([self.AGG_BY]) # except: # logger.info("Column to Aggregate by is missing") # continue # zone_lines = zone_lines.xs(zone_input) # zone_lines = zone_lines['line_name'].unique() # #Map lines to interfaces # all_ints = self.meta.interface_lines(scenario) #Map lines to interfaces # all_ints.index = all_ints.line # ints = all_ints.loc[all_ints.index.intersection(zone_lines)] # #flow = flow[flow.index.get_level_values('interface_name').isin(ints.interface)] #Limit to only interfaces touching to this zone # #flow = flow.droplevel('interface_category') # export_limits = self["interface_Export_Limit"].get(scenario).copy().droplevel('timestamp') # export_limits.mask(export_limits[0]==0.0,other=0.01,inplace=True) #if limit is zero set to small value # export_limits = export_limits[export_limits.index.get_level_values('interface_name').isin(ints.interface)] # export_limits = export_limits[export_limits[0].abs() < 99998] #Filter out unenforced interfaces. # #Drop unnecessary columns. # export_limits.reset_index(inplace = True) # export_limits.drop(columns=['interface_category', 'units'], inplace=True) # export_limits.set_index('interface_name',inplace = True) # import_limits = self["interface_Import_Limit"].get(scenario).copy().droplevel('timestamp') # import_limits.mask(import_limits[0]==0.0,other=0.01,inplace=True) #if limit is zero set to small value # import_limits = import_limits[import_limits.index.get_level_values('interface_name').isin(ints.interface)] # import_limits = import_limits[import_limits[0].abs() < 99998] #Filter out unenforced interfaces. # reported_ints = import_limits.index.get_level_values('interface_name').unique() # #Drop unnecessary columns. # import_limits.reset_index(inplace = True) # import_limits.drop(columns=['interface_category', 'units'], inplace=True) # import_limits.set_index('interface_name',inplace = True) # #Extract time index # ti = self["interface_Flow"][self.Scenarios[0]].index.get_level_values('timestamp').unique() # if prop: # interf_list = prop.split(',') # logger.info('Plotting only interfaces specified in Marmot_plot_select.csv') # logger.info(interf_list) # else: # interf_list = reported_ints.copy() # logger.info('Plotting full time series results.') # xdim,ydim = set_x_y_dimension(len(interf_list)) # mplt = PlotLibrary(ydim, xdim, squeeze=False, # ravel_axs=True) # fig, axs = mplt.get_figure() # grid_size = xdim * ydim # excess_axs = grid_size - len(interf_list) # plt.subplots_adjust(wspace=0.05, hspace=0.2) # missing_ints = 0 # chunks = [] # n = -1 # for interf in interf_list: # n += 1 # #Remove leading spaces # if interf[0] == ' ': # interf = interf[1:] # if interf in reported_ints: # chunks_interf = [] # single_exp_lim = export_limits.loc[interf] / 1000 #TODO: Use auto unit converter # single_imp_lim = import_limits.loc[interf] / 1000 # #Check if all hours have the same limit. # check = single_exp_lim.to_numpy() # identical = check[0] == check.all() # limits = pd.concat([single_exp_lim,single_imp_lim],axis = 1) # limits.columns = ['export limit','import limit'] # limits.index = ti # for scenario in self.Scenarios: # flow = self["interface_Flow"].get(scenario) # single_int = flow.xs(interf, level='interface_name') / 1000 # single_int.index = single_int.index.droplevel(['interface_category','units']) # single_int.columns = [interf] # single_int = single_int.reset_index().set_index('timestamp') # limits = limits.reset_index().set_index('timestamp') # if shift_leapday: # single_int = adjust_for_leapday(single_int) # if pd.notna(start_date_range): # single_int = single_int[start_date_range : end_date_range] # limits = limits[start_date_range : end_date_range] # if duration_curve: # single_int = sort_duration(single_int,interf) # mplt.lineplot(single_int, interf, # label=f"{scenario}\n interface flow", # sub_pos=n) # # Only print limits if it doesn't change monthly or if you are plotting a time series. # # Otherwise the limit lines could be misleading. # if not duration_curve or identical[0]: # if scenario == self.Scenarios[-1]: # #Only plot limits for last scenario. # limits_color_dict = {'export limit': 'red', 'import limit': 'green'} # mplt.lineplot(limits, 'export limit', # label='export limit', color=limits_color_dict, # linestyle='--', sub_pos=n) # mplt.lineplot(limits, 'import limit', # label='import limit', color=limits_color_dict, # linestyle='--', sub_pos=n) # #For output time series .csv # scenario_names = pd.Series([scenario] * len(single_int), name='Scenario') # single_int_out = single_int.set_index([scenario_names], append=True) # chunks_interf.append(single_int_out) # Data_out_line = pd.concat(chunks_interf,axis = 0) # Data_out_line.columns = [interf] # chunks.append(Data_out_line) # else: # logger.warning(f"{interf} not found in results. Have you tagged " # "it with the 'Must Report' property in PLEXOS?") # excess_axs += 1 # missing_ints += 1 # continue # axs[n].set_title(interf) # if not duration_curve: # mplt.set_subplot_timeseries_format(sub_pos=n) # if missing_ints == len(interf_list): # outputs = MissingInputData() # return outputs # data_table_out = pd.concat(chunks,axis = 1) # data_table_out = data_table_out.reset_index() # index_name = 'level_0' if duration_curve else 'timestamp' # data_table_out = data_table_out.pivot(index = index_name,columns = 'Scenario') # #Limits_Out = pd.concat(limits_chunks,axis = 1) # #Limits_Out.index = ['Export Limit','Import Limit'] # # data_table_out = data_table_out.reset_index() # # data_table_out = data_table_out.groupby(data_table_out.index // 24).mean() # # data_table_out.index = pd.date_range(start = '1/1/2024',end = '12/31/2024',freq = 'D') # mplt.add_legend() # plt.ylabel('Flow (GW)', color='black', rotation='vertical', # labelpad=30) # if duration_curve: # plt.xlabel('Sorted hour of the year', color='black', labelpad=30) # plt.tight_layout(rect=[0, 0.03, 1, 0.97]) # if plot_data_settings["plot_title_as_region"]: # mplt.add_main_title(zone_input) # outputs[zone_input] = {'fig': fig, 'data_table': data_table_out} # #Limits_Out.to_csv(self.figure_folder.joinpath(self.AGG_BY + '_transmission','Individual_Interface_Limits.csv')) # return outputs
[docs] def int_flow_ind_seasonal( self, figure_name: str = None, prop: str = None, start_date_range: str = None, end_date_range: str = None, **_, ): """#TODO: Finish Docstring Args: figure_name (str, optional): User defined figure output name. Defaults to None. prop (str, optional): Comma separated string of interchanges. Defaults to None. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table """ # TODO: Use auto unit converter in method duration_curve = False if "duration_curve" in figure_name: duration_curve = True # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [ (True, "interface_Flow", self.Scenarios), (True, "interface_Import_Limit", self.Scenarios), (True, "interface_Export_Limit", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() scenario = self.Scenarios[0] outputs: dict = {} for zone_input in self.Zones: logger.info("For all interfaces touching Zone = " + zone_input) data_table_out = pd.DataFrame() # gets correct metadata based on area aggregation if self.AGG_BY == "zone": zone_lines = self.meta.zone_lines(scenario) else: zone_lines = self.meta.region_lines(scenario) try: zone_lines = zone_lines.set_index([self.AGG_BY]) except: logger.info("Column to Aggregate by is missing") continue zone_lines = zone_lines.xs(zone_input) zone_lines = zone_lines["line_name"].unique() # Map lines to interfaces all_ints = self.meta.interface_lines(scenario) # Map lines to interfaces all_ints.index = all_ints.line ints = all_ints.loc[all_ints.index.intersection(zone_lines)] # flow = flow[flow.index.get_level_values('interface_name').isin(ints.interface)] #Limit to only interfaces touching to this zone # flow = flow.droplevel('interface_category') export_limits = ( self["interface_Export_Limit"].get(scenario).droplevel("timestamp") ) export_limits.mask( export_limits["values"] == 0.0, other=0.01, inplace=True ) # if limit is zero set to small value export_limits = export_limits[ export_limits.index.get_level_values("interface_name").isin( ints.interface ) ] export_limits = export_limits[ export_limits["values"].abs() < 99998 ] # Filter out unenforced interfaces. # Drop unnecessary columns. export_limits.reset_index(inplace=True) export_limits.drop(columns="interface_category", inplace=True) export_limits.set_index("interface_name", inplace=True) import_limits = ( self["interface_Import_Limit"].get(scenario).droplevel("timestamp") ) import_limits.mask( import_limits["values"] == 0.0, other=0.01, inplace=True ) # if limit is zero set to small value import_limits = import_limits[ import_limits.index.get_level_values("interface_name").isin( ints.interface ) ] import_limits = import_limits[ import_limits["values"].abs() < 99998 ] # Filter out unenforced interfaces. reported_ints = import_limits.index.get_level_values( "interface_name" ).unique() # Drop unnecessary columns. import_limits.reset_index(inplace=True) import_limits.drop(columns="interface_category", inplace=True) import_limits.set_index("interface_name", inplace=True) # Extract time index ti = ( self["interface_Flow"][self.Scenarios[0]] .index.get_level_values("timestamp") .unique() ) if prop != "": interf_list = prop.split(",") logger.info( "Plotting only interfaces specified in Marmot_plot_select.csv" ) logger.info(interf_list) else: interf_list = reported_ints.copy() logger.info( "Carving out season from " + start_date_range + " to " + end_date_range ) # Remove missing interfaces from the list. for interf in interf_list: # Remove leading spaces if interf[0] == " ": interf = interf[1:] if interf not in reported_ints: logger.warning(interf + " not found in results.") interf_list.remove(interf) if not interf_list: outputs = MissingInputData() return outputs xdim = 2 ydim = len(interf_list) mplt = PlotLibrary(ydim, xdim, squeeze=False) fig, axs = mplt.get_figure() grid_size = xdim * ydim excess_axs = grid_size - len(interf_list) plt.subplots_adjust(wspace=0.05, hspace=0.2) missing_ints = 0 chunks = [] limits_chunks = [] n = -1 for interf in interf_list: n += 1 # Remove leading spaces if interf[0] == " ": interf = interf[1:] chunks_interf = [] single_exp_lim = export_limits.loc[interf] / 1000 single_imp_lim = import_limits.loc[interf] / 1000 # Check if all hours have the same limit. check = single_exp_lim.to_numpy() identical = check[0] == check.all() limits = pd.concat([single_exp_lim, single_imp_lim], axis=1) limits.columns = ["export limit", "import limit"] limits.index = ti for scenario in self.Scenarios: flow = self["interface_Flow"].get(scenario) single_int = flow.xs(interf, level="interface_name") / 1000 single_int.index = single_int.index.droplevel("interface_category") single_int.columns = [interf] if shift_leapday: single_int = adjust_for_leapday(single_int) summer = single_int[start_date_range:end_date_range] winter = single_int.drop(summer.index) summer_lim = limits[start_date_range:end_date_range] winter_lim = limits.drop(summer.index) if duration_curve: summer = sort_duration(summer, interf) winter = sort_duration(winter, interf) summer_lim = sort_duration(summer_lim, "export limit") winter_lim = sort_duration(winter_lim, "export limit") axs[n, 0].plot( summer[interf], linewidth=1, label=scenario + "\n interface flow", ) axs[n, 1].plot( winter[interf], linewidth=1, label=scenario + "\n interface flow", ) if scenario == self.Scenarios[-1]: for col in summer_lim: limits_color_dict = { "export limit": "red", "import limit": "green", } axs[n, 0].plot( summer_lim[col], linewidth=1, linestyle="--", color=limits_color_dict[col], label=col, ) axs[n, 1].plot( winter_lim[col], linewidth=1, linestyle="--", color=limits_color_dict[col], label=col, ) # For output time series .csv scenario_names = pd.Series( [scenario] * len(single_int), name="Scenario" ) single_int_out = single_int.set_index([scenario_names], append=True) chunks_interf.append(single_int_out) Data_out_line = pd.concat(chunks_interf, axis=0) Data_out_line.columns = [interf] chunks.append(Data_out_line) axs[n, 0].set_title(interf) axs[n, 1].set_title(interf) if not duration_curve: locator = mdates.AutoDateLocator(minticks=4, maxticks=8) formatter = mdates.ConciseDateFormatter(locator) formatter.formats[2] = "%d\n %b" formatter.zero_formats[1] = "%b\n %Y" formatter.zero_formats[2] = "%d\n %b" formatter.zero_formats[3] = "%H:%M\n %d-%b" formatter.offset_formats[3] = "%b %Y" formatter.show_offset = False axs[n, 0].xaxis.set_major_locator(locator) axs[n, 0].xaxis.set_major_formatter(formatter) axs[n, 1].xaxis.set_major_locator(locator) axs[n, 1].xaxis.set_major_formatter(formatter) mplt.add_legend() data_table_out = pd.concat(chunks, axis=1) # Limits_Out = pd.concat(limits_chunks,axis = 1) # Limits_Out.index = ['Export Limit','Import Limit'] plt.ylabel("Flow (GW)", color="black", rotation="vertical", labelpad=30) if duration_curve: plt.xlabel("Sorted hour of the year", color="black", labelpad=30) fig.text( 0.15, 0.98, "Summer (" + start_date_range + " to " + end_date_range + ")", fontsize=16, ) fig.text(0.58, 0.98, "Winter", fontsize=16) plt.tight_layout(rect=[0, 0.03, 1, 0.97]) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) outputs[zone_input] = {"fig": fig, "data_table": data_table_out} # Limits_Out.to_csv(self.figure_folder.joinpath(self.AGG_BY + '_transmission','Individual_Interface_Limits.csv')) return outputs
# TODO: re-organize parameters (self vs. not self)
[docs] def int_flow_ind_diff(self, figure_name: str = None, **_): """Plot under development This method plots the hourly difference in interface flow between two scenarios for individual interfaces, with a facet for each interface. The two scenarios are defined in the "scenario_diff" row of Marmot_user_defined_inputs. The interfaces are specified in the plot properties field of Marmot_plot_select.csv (column 4). The figure and data tables are saved within the module. Returns: UnderDevelopment(): Exception class, plot is not functional. """ return UnderDevelopment() # TODO: add new get_data method duration_curve = False if "duration_curve" in figure_name: duration_curve = True check_input_data = [] Flow_Collection = {} Import_Limit_Collection = {} Export_Limit_Collection = {} check_input_data.extend( [ get_data( Flow_Collection, "interface_Flow", self.Marmot_Solutions_folder, self.Scenarios, ) ] ) check_input_data.extend( [ get_data( Import_Limit_Collection, "interface_Import_Limit", self.Marmot_Solutions_folder, self.Scenarios, ) ] ) check_input_data.extend( [ get_data( Export_Limit_Collection, "interface_Export_Limit", self.Marmot_Solutions_folder, self.Scenarios, ) ] ) if 1 in check_input_data: outputs = MissingInputData() return outputs scenario = self.Scenarios[0] outputs: dict = {} if not pd.isnull(self.start_date): logger.info( "Plotting specific date range: \ {} to {}".format( str(self.start_date), str(self.end_date) ) ) for zone_input in self.Zones: logger.info("For all interfaces touching Zone = " + zone_input) data_table_out = pd.DataFrame() # gets correct metadata based on area aggregation if self.AGG_BY == "zone": zone_lines = self.meta.zone_lines(scenario) else: zone_lines = self.meta.region_lines(scenario) try: zone_lines = zone_lines.set_index([self.AGG_BY]) except: logger.info("Column to Aggregate by is missing") continue zone_lines = zone_lines.xs(zone_input) zone_lines = zone_lines["line_name"].unique() # Map lines to interfaces all_ints = self.meta.interface_lines(scenario) # Map lines to interfaces all_ints.index = all_ints.line ints = all_ints.loc[all_ints.index.intersection(zone_lines)] # flow = flow[flow.index.get_level_values('interface_name').isin(ints.interface)] #Limit to only interfaces touching to this zone # flow = flow.droplevel('interface_category') export_limits = Export_Limit_Collection.get(scenario).droplevel("timestamp") export_limits.mask( export_limits["values"] == 0.0, other=0.01, inplace=True ) # if limit is zero set to small value export_limits = export_limits[ export_limits.index.get_level_values("interface_name").isin( ints.interface ) ] export_limits = export_limits[ export_limits["values"].abs() < 99998 ] # Filter out unenforced interfaces. # Drop unnecessary columns. export_limits.reset_index(inplace=True) export_limits.drop(columns="interface_category", inplace=True) export_limits.set_index("interface_name", inplace=True) import_limits = Import_Limit_Collection.get(scenario).droplevel("timestamp") import_limits.mask( import_limits["values"] == 0.0, other=0.01, inplace=True ) # if limit is zero set to small value import_limits = import_limits[ import_limits.index.get_level_values("interface_name").isin( ints.interface ) ] import_limits = import_limits[ import_limits["values"].abs() < 99998 ] # Filter out unenforced interfaces. reported_ints = import_limits.index.get_level_values( "interface_name" ).unique() # Drop unnecessary columns. import_limits.reset_index(inplace=True) import_limits.drop(columns="interface_category", inplace=True) import_limits.set_index("interface_name", inplace=True) # Extract time index ti = ( Flow_Collection[self.Scenarios[0]] .index.get_level_values("timestamp") .unique() ) if self.prop != "": interf_list = self.prop.split(",") logger.info( "Plotting only interfaces specified in Marmot_plot_select.csv" ) logger.info(interf_list) else: interf_list = reported_ints.copy() logger.info("Plotting full time series results.") xdim, ydim = set_x_y_dimension(len(interf_list)) mplt = PlotLibrary(nrows, ncols, squeeze=False, ravel_axs=True) fig, axs = mplt.get_figure() grid_size = xdim * ydim excess_axs = grid_size - len(interf_list) plt.subplots_adjust(wspace=0.05, hspace=0.2) missing_ints = 0 chunks = [] limits_chunks = [] n = -1 for interf in interf_list: n += 1 # Remove leading spaces if interf[0] == " ": interf = interf[1:] if interf in reported_ints: chunks_interf = [] single_exp_lim = ( export_limits.loc[interf] / 1000 ) # TODO: Use auto unit converter in method single_imp_lim = import_limits.loc[interf] / 1000 # Check if all hours have the same limit. check = single_exp_lim.to_numpy() identical = check[0] == check.all() limits = pd.concat([single_exp_lim, single_imp_lim], axis=1) limits.columns = ["export limit", "import limit"] limits.index = ti for scenario in self.Scenarios: flow = Flow_Collection.get(scenario) single_int = flow.xs(interf, level="interface_name") / 1000 single_int.index = single_int.index.droplevel( "interface_category" ) single_int.columns = [interf] if shift_leapday: single_int = adjust_for_leapday(single_int) single_int = single_int.reset_index().set_index("timestamp") limits = limits.reset_index().set_index("timestamp") if not pd.isnull(self.start_date): single_int = single_int[self.start_date : self.end_date] limits = limits[self.start_date : self.end_date] if duration_curve: single_int = sort_duration(single_int, interf) mplt.lineplot( single_int, interf, label=scenario + "\n interface flow", sub_pos=n, ) # Only print limits if it doesn't change monthly or if you are plotting a time series. Otherwise the limit lines could be misleading. if not duration_curve or identical[0]: if scenario == self.Scenarios[-1]: # Only plot limits for last scenario. limits_color_dict = { "export limit": "red", "import limit": "green", } mplt.lineplot( limits, "export limit", label="export limit", color=limits_color_dict, linestyle="--", sub_pos=n, ) mplt.lineplot( limits, "import limit", label="import limit", color=limits_color_dict, linestyle="--", sub_pos=n, ) # For output time series .csv scenario_names = pd.Series( [scenario] * len(single_int), name="Scenario" ) single_int_out = single_int.set_index( [scenario_names], append=True ) chunks_interf.append(single_int_out) Data_out_line = pd.concat(chunks_interf, axis=0) Data_out_line.columns = [interf] chunks.append(Data_out_line) else: logger.warning( interf + ' not found in results. Have you tagged it with the "Must Report" property in PLEXOS?' ) excess_axs += 1 missing_ints += 1 continue axs[n].set_title(interf) handles, labels = axs[n].get_legend_handles_labels() if not duration_curve: self.set_subplot_timeseries_format(axs, sub_pos=n) if n == len(interf_list) - 1: axs[n].legend(loc="lower left", bbox_to_anchor=(1.05, -0.2)) if missing_ints == len(interf_list): outputs = MissingInputData() return outputs data_table_out = pd.concat(chunks, axis=1) data_table_out = data_table_out.reset_index() index_name = "level_0" if duration_curve else "timestamp" data_table_out = data_table_out.pivot(index=index_name, columns="Scenario") # Limits_Out = pd.concat(limits_chunks,axis = 1) # Limits_Out.index = ['Export Limit','Import Limit'] # data_table_out = data_table_out.reset_index() # data_table_out = data_table_out.groupby(data_table_out.index // 24).mean() # data_table_out.index = pd.date_range(start = '1/1/2024',end = '12/31/2024',freq = 'D') plt.ylabel("Flow (GW)", color="black", rotation="vertical", labelpad=30) if duration_curve: plt.xlabel("Sorted hour of the year", color="black", labelpad=30) plt.tight_layout(rect=[0, 0.03, 1, 0.97]) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) outputs[zone_input] = {"fig": fig, "data_table": data_table_out} # Limits_Out.to_csv(self.figure_folder.joinpath(self.AGG_BY + '_transmission','Individual_Interface_Limits.csv')) return outputs
[docs] def interface_flow_ind( self, figure_name: str = "Individual_Interface_Flow", prop: str = None, **kwargs ): """Timeseries or duration curve of individual interfaces. This method plots flow, import and export limit, for individual transmission interfaces, with a facet for each interface. The interfaces are specified in the plot properties field of Marmot_plot_select.csv (column 4). Figures and data tables are saved within the method. Args: figure_name (str, optional): Figure output name. Defaults to "Individual_Interface_Flow". prop (str, optional): Used to pass in interface names. Input format should be a comma seperated string. Defaults to None. **kwargs Any additional parameters will be passed to the _tx_flow_ind method. Returns: DataSavedInModule: DataSavedInModule class """ if prop is None: return InputSheetError() return self._tx_flow_ind( connection="interface", figure_name=figure_name, prop=prop, **kwargs )
[docs] def line_flow_ind( self, figure_name: str = "Individual_Line_Flow", prop: str = None, **kwargs ): """Timeseries or duration curve of individual lines. This method plots flow, import and export limit, for individual transmission lines, with a facet for each line. The lines are specified in the plot properties field of Marmot_plot_select.csv (column 4). Figures and data tables are saved within the method. Args: figure_name (str, optional): Figure output name. Defaults to "Individual_Line_Flow". prop (str, optional): Used to pass in line nams. Input format should be a comma seperated string. Defaults to None. **kwargs Any additional parameters will be passed to the _tx_flow_ind method. Returns: DataSavedInModule: DataSavedInModule class """ if prop is None: return InputSheetError() return self._tx_flow_ind( connection="line", figure_name=figure_name, prop=prop, **kwargs )
def _tx_flow_ind( self, connection: str, figure_name: str, prop: str, start_date_range: str = None, end_date_range: str = None, **_, ): """Creates Timeseries or duration curve of individual lines or interfaces. This methods is called from int_flow_ind() and line_flow_ind() Args: figure_name (str): Figure output name. prop (str): Used to pass in line/interface names. Input format should be a comma seperated string. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: DataSavedInModule: DataSavedInModule class """ duration_curve = False if "duration_curve" in figure_name: duration_curve = True outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [ (True, f"{connection}_Flow", self.Scenarios), (False, f"{connection}_Import_Limit", self.Scenarios), (False, f"{connection}_Export_Limit", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() # Select only lines specified in Marmot_plot_select.csv. select_lines = [x.strip() for x in prop.split(",")] logger.info(f"Plotting only {connection}s specified in Marmot_plot_select.csv") logger.info(select_lines) xdim, ydim = set_x_y_dimension(len(select_lines)) mplt = PlotLibrary(ydim, xdim, squeeze=False, ravel_axs=True, sharey=True) fig, axs = mplt.get_figure() data_tables = [] for n, line in enumerate(select_lines): logger.info(f"{connection} = {line}") chunks_line = [] for scenario in self.Scenarios: logger.info(f"Scenario = {scenario}") flow = self[f"{connection}_Flow"].get(scenario) try: single_line = flow.xs(line, level=f"{connection}_name") except KeyError: logger.warning( f"{line} not found in results. Have you tagged " "it with the 'Must Report' property in PLEXOS?" ) return MissingInputData() single_line = single_line.droplevel("units") if "category" in single_line.index.names: single_line = single_line.droplevel("category") elif "interface_category" in single_line.index.names: single_line = single_line.droplevel("interface_category") single_line = single_line.rename(columns={"values": scenario}) if shift_leapday: single_line = adjust_for_leapday(single_line) if pd.notna(start_date_range): single_line = set_timestamp_date_range( single_line, start_date_range, end_date_range ) if single_line.empty is True: logger.warning("No data in selected Date Range") continue chunks_line.append(single_line) # For output time series.csv single_line_out = single_line.rename( columns={scenario: f"{connection} Flow"} ) scenario_names = pd.Series( [scenario] * len(single_line_out), name="Scenario" ) line_name = pd.Series([line] * len(single_line_out), name=connection) single_line_out = single_line_out.set_index( [scenario_names, line_name], append=True ) data_tables.append(single_line_out) if chunks_line: line_flow_out = pd.concat(chunks_line, axis=1) else: return MissingInputData() # Only convert on first lines if n == 0: unitconversion = self.capacity_energy_unitconversion( line_flow_out, self.Scenarios, sum_values=False ) line_flow_out = line_flow_out / unitconversion["divisor"] legend_order = [] # Plot line flow for column in line_flow_out: if duration_curve: line_flow_single = sort_duration(line_flow_out, column) else: line_flow_single = line_flow_out legend_label = f"{column}" mplt.lineplot(line_flow_single, column, label=legend_label, sub_pos=n) axs[n].axhline(y=0, linestyle=":", color="gray") legend_order.append(legend_label) # Get and process all line limits line_limits = ( self.get_line_interface_limits( [ f"{connection}_Export_Limit", f"{connection}_Import_Limit", ], line, ) / unitconversion["divisor"] ) # Plot line limits self.plot_line_interface_limits(line_limits, mplt, n, duration_curve) axs[n].set_title(line) if not duration_curve: mplt.set_subplot_timeseries_format(sub_pos=n) data_table_out = pd.concat(data_tables, axis=0) / unitconversion["divisor"] data_table_out = data_table_out.add_suffix(f" ({unitconversion['units']})") legend_order.extend(["Export Limit", "Import Limit"]) mplt.add_legend(sort_by=legend_order) plt.ylabel( f"Flow ({unitconversion['units']})", color="black", rotation="vertical", labelpad=30, ) # plt.tight_layout() if duration_curve: plt.xlabel("Intervals", color="black", rotation="horizontal", labelpad=20) fn_suffix = "_duration_curve" if duration_curve else "" fig.savefig( self.figure_folder.joinpath( f"{self.AGG_BY}_transmission", f"{figure_name}{fn_suffix}.svg" ), dpi=600, bbox_inches="tight", ) data_table_out.to_csv( self.figure_folder.joinpath( f"{self.AGG_BY}_transmission", f"{figure_name}{fn_suffix}.csv" ) ) outputs = DataSavedInModule() return outputs
[docs] def line_flow_ind_diff(self, figure_name: str = None, prop: str = None, **_): """ #TODO: Finish Docstring This method plots the flow difference for individual transmission lines, with a facet for each line. The scenarios are specified in the "Scenario_Diff_plot" field of Marmot_user_defined_inputs.csv. The lines are specified in the plot properties field of Marmot_plot_select.csv (column 4). Figures and data tables are saved in the module. Args: figure_name (str, optional): [description]. Defaults to None. prop (str, optional): [description]. Defaults to None. Returns: [type]: [description] """ # TODO: Use auto unit converter in method duration_curve = False if "duration_curve" in figure_name: duration_curve = True # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, "line_Flow", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: outputs = MissingInputData() return outputs # Select only lines specified in Marmot_plot_select.csv. select_lines = prop.split(",") if select_lines == None: outputs = InputSheetError() return outputs logger.info("Plotting only lines specified in Marmot_plot_select.csv") logger.info(select_lines) flow_diff = self["line_Flow"].get(self.scenario_diff[1]) - self[ "line_Flow" ].get(self.scenario_diff[0]) xdim, ydim = set_x_y_dimension(len(select_lines)) grid_size = xdim * ydim excess_axs = grid_size - len(select_lines) mplt = PlotLibrary(ydim, xdim, squeeze=False, ravel_axs=True) fig, axs = mplt.get_figure() plt.subplots_adjust(wspace=0.05, hspace=0.2) reported_lines = ( self["line_Flow"] .get(self.Scenarios[0]) .index.get_level_values("line_name") .unique() ) n = -1 missing_lines = 0 chunks = [] for line in select_lines: n += 1 # Remove leading spaces if line[0] == " ": line = line[1:] if line in reported_lines: single_line = flow_diff.xs(line, level="line_name") single_line.columns = [line] if shift_leapday: single_line = adjust_for_leapday(single_line) single_line_out = single_line.copy() if duration_curve: single_line = sort_duration(single_line, line) # mplt.lineplot(single_line,line, label = self.scenario_diff[1] + ' - \n' + self.scenario_diff[0] + '\n line flow', sub_pos = n) mplt.lineplot( single_line, line, label="BESS - no BESS \n line flow", sub_pos=n ) else: logger.warning( line + ' not found in results. Have you tagged it with the "Must Report" property in PLEXOS?' ) excess_axs += 1 missing_lines += 1 continue mplt.remove_excess_axs(excess_axs, grid_size) axs[n].set_title(line) if not duration_curve: mplt.set_subplot_timeseries_format(sub_pos=n) chunks.append(single_line_out) if missing_lines == len(select_lines): outputs = MissingInputData() return outputs data_table_out = pd.concat(chunks, axis=1) mplt.add_legend() plt.ylabel( "Flow difference (MW)", color="black", rotation="vertical", labelpad=30 ) plt.tight_layout() fn_suffix = "_duration_curve" if duration_curve else "" fig.savefig( self.figure_folder.joinpath( self.AGG_BY + "_transmission", figure_name + fn_suffix + ".svg" ), dpi=600, bbox_inches="tight", ) data_table_out.to_csv( self.figure_folder.joinpath( self.AGG_BY + "_transmission", figure_name + fn_suffix + ".csv" ) ) outputs = DataSavedInModule() return outputs
[docs] def line_flow_ind_seasonal( self, figure_name: str = None, prop: str = None, start_date_range: str = None, end_date_range: str = None, **_, ): """TODO: Finish Docstring. This method differs from the previous method, in that it plots seasonal line limits. To use this method, line import/export must be an "interval" property, not a "year" property. This can be selected in "plexos_properties.csv". Re-run the formatter if necessary, it will overwrite the existing properties in "*_formatted.h5" This method plots flow, import and export limit, for individual transmission lines, with a facet for each line. The lines are specified in the plot properties field of Marmot_plot_select.csv (column 4). The plot includes every interchange that originates or ends in the aggregation zone. Figures and data tables saved in the module. Args: figure_name (str, optional): [description]. Defaults to None. prop (str, optional): [description]. Defaults to None. start_date_range (str, optional): [description]. Defaults to None. end_date_range (str, optional): [description]. Defaults to None. Returns: DataSavedInModule: DataSavedInModule class """ # TODO: Use auto unit converter in method if pd.isna(start_date_range): logger.warning( 'You are attempting to plot a time series facetted by two seasons,\n\ but you are missing a value in the "Start Date" column of "Marmot_plot_select.csv" \ Please enter dates in "Start Date" and "End Date". These will define the bounds of \ one of your two seasons. The other season will be comprised of the rest of the year.' ) return MissingInputData() duration_curve = False if "duration_curve" in figure_name: duration_curve = True # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [ (True, "line_Flow", self.Scenarios), (True, "line_Import_Limit", self.Scenarios), (True, "line_Export_Limit", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() # Select only lines specified in Marmot_plot_select.csv. select_lines = prop.split(",") if select_lines == None: return InputSheetError() logger.info("Plotting only lines specified in Marmot_plot_select.csv") logger.info(select_lines) scenario = self.Scenarios[0] # Line limits are seasonal. export_limits = self["line_Export_Limit"].get(scenario).droplevel("timestamp") export_limits.mask( export_limits["values"] == 0.0, other=0.01, inplace=True ) # if limit is zero set to small value export_limits = export_limits[ export_limits["values"].abs() < 99998 ] # Filter out unenforced lines. import_limits = self["line_Import_Limit"].get(scenario).droplevel("timestamp") import_limits.mask( import_limits["values"] == 0.0, other=0.01, inplace=True ) # if limit is zero set to small value import_limits = import_limits[ import_limits["values"].abs() < 99998 ] # Filter out unenforced lines. # Extract time index ti = ( self["line_Flow"][self.Scenarios[0]] .index.get_level_values("timestamp") .unique() ) reported_lines = ( self["line_Flow"][self.Scenarios[0]] .index.get_level_values("line_name") .unique() ) logger.info( "Carving out season from " + start_date_range + " to " + end_date_range ) # Remove missing interfaces from the list. for line in select_lines: # Remove leading spaces if line[0] == " ": line = line[1:] if line not in reported_lines: logger.warning(line + " not found in results.") select_lines.remove(line) if not select_lines: outputs = MissingInputData() return outputs xdim = 2 ydim = len(select_lines) grid_size = xdim * ydim excess_axs = grid_size - len(select_lines) mplt = PlotLibrary(ydim, xdim, squeeze=False) fig, axs = mplt.get_figure() i = -1 missing_lines = 0 chunks = [] limits_chunks = [] for line in select_lines: i += 1 # Remove leading spaces if line[0] == " ": line = line[1:] chunks_line = [] single_exp_lim = export_limits.loc[line] single_exp_lim.index = ti single_imp_lim = import_limits.loc[line] single_imp_lim.index = ti limits = pd.concat([single_exp_lim, single_imp_lim], axis=1) limits.columns = ["export limit", "import limit"] limits.index = ti limits_chunks.append(limits) for scenario in self.Scenarios: flow = self["line_Flow"][scenario] single_line = flow.xs(line, level="line_name") single_line = single_line.droplevel("units") single_line_out = single_line.copy() single_line.columns = [line] if shift_leapday: single_line = adjust_for_leapday(single_line) # Split into seasons. summer = single_line[start_date_range:end_date_range] winter = single_line.drop(summer.index) summer_lim = limits[start_date_range:end_date_range] winter_lim = limits.drop(summer.index) if duration_curve: summer = sort_duration(summer, line) winter = sort_duration(winter, line) summer_lim = sort_duration(summer_lim, "export limit") winter_lim = sort_duration(winter_lim, "export limit") axs[i, 0].plot( summer[line], linewidth=1, label=scenario + "\n line flow" ) axs[i, 1].plot( winter[line], linewidth=1, label=scenario + "\n line flow" ) if scenario == self.Scenarios[-1]: for col in summer_lim: limits_color_dict = { "export limit": "red", "import limit": "green", } axs[i, 0].plot( summer_lim[col], linewidth=1, linestyle="--", color=limits_color_dict[col], label=col, ) axs[i, 1].plot( winter_lim[col], linewidth=1, linestyle="--", color=limits_color_dict[col], label=col, ) for j in [0, 1]: axs[i, j].spines["right"].set_visible(False) axs[i, j].spines["top"].set_visible(False) axs[i, j].tick_params(axis="y", which="major", length=5, width=1) axs[i, j].tick_params(axis="x", which="major", length=5, width=1) axs[i, j].set_title(line) if i == len(select_lines) - 1: axs[i, j].legend( loc="lower left", bbox_to_anchor=(1.05, 0), facecolor="inherit", frameon=True, ) # For output time series .csv scenario_names = pd.Series( [scenario] * len(single_line_out), name="Scenario" ) single_line_out.columns = [line] single_line_out = single_line_out.set_index( [scenario_names], append=True ) chunks_line.append(single_line_out) Data_out_line = pd.concat(chunks_line, axis=0) chunks.append(Data_out_line) if missing_lines == len(select_lines): outputs = MissingInputData() return outputs data_table_out = pd.concat(chunks, axis=1) # Limits_Out = pd.concat(limits_chunks,axis = 1) # Limits_Out.index = ['Export Limit','Import Limit'] fig.text(0.3, 1, "Summer (Jun - Sep)") fig.text(0.6, 1, "Winter (Jan - Mar,Oct - Dec)") plt.ylabel("Flow (MW)", color="black", rotation="vertical", labelpad=30) plt.tight_layout() fn_suffix = "_duration_curve" if duration_curve else "" fig.savefig( self.figure_folder.joinpath( self.AGG_BY + "_transmission", "Individual_Line_Flow" + fn_suffix + "_seasonal.svg", ), dpi=600, bbox_inches="tight", ) data_table_out.to_csv( self.figure_folder.joinpath( self.AGG_BY + "_transmission", "Individual_Line_Flow" + fn_suffix + "_seasonal.csv", ) ) # Limits_Out.to_csv(self.figure_folder.joinpath(self.AGG_BY + '_transmission','Individual_Line_Limits.csv')) outputs = DataSavedInModule() return outputs
def _grab_zone_net_load(self, zone_input, scenario, interconnect_aggby=False): fxnaggregator = self.AGG_BY # what is the interconnect for this zone? print(interconnect_aggby) if interconnect_aggby: Stacked_Gen_A = ( self.mplot_data_dict["generator_Generation"].get(scenario).copy() ) Stacked_Gen_A = Stacked_Gen_A.reset_index() print(Stacked_Gen_A.columns) fxnaggregator = "Interconnection" zone_input = Stacked_Gen_A[Stacked_Gen_A[self.AGG_BY] == zone_input][ fxnaggregator ].unique()[0] try: Stacked_Gen = ( self.mplot_data_dict["generator_Generation"].get(scenario).copy() ) if self.shift_leapday is True: Stacked_Gen = self.shift_leapday( Stacked_Gen, self.Marmot_Solutions_folder ) Stacked_Gen = Stacked_Gen.xs(zone_input, level=fxnaggregator) except KeyError: self.logger.warning(f"No generation in {zone_input}") out = MissingZoneData() return out Stacked_Gen = self.df_process_gen_inputs(Stacked_Gen, self.ordered_gen) curtailment_name = self.gen_names_dict.get("Curtailment", "Curtailment") # print(self.mplot_data_dict[f"generator_Curtailment"].get(scenario).copy()) # Insert Curtailmnet into gen stack if it exhists in database if self.mplot_data_dict[f"generator_{curtailment_name}"]: Stacked_Curt = ( self.mplot_data_dict[f"generator_{curtailment_name}"] .get(scenario) .copy() ) if self.shift_leapday is True: Stacked_Curt = self.shift_leapday( Stacked_Curt, self.Marmot_Solutions_folder ) if ( zone_input in Stacked_Curt.index.get_level_values(fxnaggregator).unique() ): Stacked_Curt = Stacked_Curt.xs(zone_input, level=fxnaggregator) Stacked_Curt = self.df_process_gen_inputs( Stacked_Curt, self.ordered_gen ) Stacked_Curt = Stacked_Curt.sum(axis=1) Stacked_Curt[Stacked_Curt < 0.05] = 0 # Remove values less than 0.05 MW Stacked_Gen.insert( len(Stacked_Gen.columns), column=curtailment_name, value=Stacked_Curt, ) # Insert curtailment into # Calculates Net Load by removing variable gen + curtailment vre_gen_cat = self.gen_categories.vre + [curtailment_name] else: vre_gen_cat = self.gen_categories.vre else: vre_gen_cat = self.gen_categories.vre # vre_gen_cat = self.gen_categories.vre vre_gen_cat = [name for name in vre_gen_cat if name in Stacked_Gen.columns] Net_Load = Stacked_Gen.drop(labels=vre_gen_cat, axis=1) Net_Load = Net_Load.sum(axis=1) return (Net_Load, zone_input) def line_scatter( self, figure_name=None, prop=None, start=None, end=None, timezone=None, start_date_range=None, end_date_range=None, ): if "," in prop: prop = tuple(prop.replace(" ", "").split(",")) else: prop = prop + ", " prop = tuple(prop.split(",")) # assert prop in ['Generation','Curtailment','Pump_Load','Load','Unserved_Energy', 'Net_Interchange'] # return self.UnderDevelopment() # then we can rank-order the hours based on the thing we wish to facet on facet = False if "Facet" in figure_name: facet = True if self.AGG_BY == "zone": agg = "zone" else: agg = "region" def set_dicts(scenario_list): # List of properties needed by the plot, properties are a set of tuples and contain 3 parts: # required True/False, property name and scenarios required, scenarios must be a list. properties = [ (True, "generator_Generation", scenario_list), (False, "generator_Curtailment", scenario_list), (False, "generator_Pump_Load", scenario_list), (True, f"{agg}_Load", scenario_list), (False, f"{agg}_Unserved_Energy", scenario_list), (False, f"{agg}_Net_Interchange", scenario_list), ] # Runs get_data to populate mplot_data_dict with all required properties, returns a 1 if required data is missing return self.get_data( self.mplot_data_dict, properties, self.Marmot_Solutions_folder ) outputs: dict = {} if facet: check_input_data = set_dicts(self.Scenarios) else: check_input_data = set_dicts([self.Scenarios[0]]) properties = [(True, f"{agg}_{agg}s_Net_Interchange", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return self.MissingInputData() # here we get the hour and season groups? categorize_hours = True if categorize_hours: scenario_0 = self.mplot_data_dict[f"{agg}_{agg}s_Net_Interchange"].get( self.Scenarios[0] ) tmpstmps = scenario_0.reset_index()["timestamp"].unique() tmp_df = pd.DataFrame({"timestamp": tmpstmps}) # get to a df tmp_df.timestamp = pd.to_datetime( tmp_df.timestamp ) # ensure are as datetimes tmp_df["hour"] = tmp_df["timestamp"].dt.hour # pull hour begin_onpeak = 8 end_onpeak = 12 tmp_df["cat"] = tmp_df["hour"].apply( lambda x: f"on-peak (HB {begin_onpeak}-{end_onpeak})" if begin_onpeak < x <= end_onpeak else "off-peak" ) # make categories as desired # here we can record the system-wide net load by scenario # for scenario in self.Scenarios: # try: # all_gen = self.mplot_data_dict['generator_Generation'].get(scenario).copy() # if self.shift_leapday is True: # all_gen = self.shift_leapday(all_gen,self.Marmot_Solutions_folder) # # Stacked_Gen = Stacked_Gen.xs(zone_input,level=self.AGG_BY) # except KeyError: # self.logger.warning(f'No generation... probably some problem') # out = MissingZoneData() # return out # for z in all_gen.index.get_level_values(self.AGG_BY).unique(): # print(z) # print(self._grab_zone_net_load(z,scenario)) # print(type(self._grab_zone_net_load(z,scenario))) # try to concat? Use scenario pointers? # split the props to see if we want net load use_net_load = False interconnect_net_load = False if prop[0] == "Load" and len(prop) > 1: for n in range(1, len(prop)): if prop[n] == "Net" or prop[n] == "net": use_net_load = True elif prop[n] == "all" or prop[n] == "All": interconnect_net_load = True # here we can grab interchange for zone_input in self.Zones: self.logger.info(f"Zone = {zone_input}") data_table_chunks = [] for scenario in self.Scenarios: if use_net_load: net_load, label_addenda = self._grab_zone_net_load( zone_input, scenario, interconnect_aggby=interconnect_net_load ) rr_int = self.mplot_data_dict[f"{agg}_{agg}s_Net_Interchange"].get( scenario ) if self.shift_leapday is True: rr_int = self.shift_leapday(rr_int, self.Marmot_Solutions_folder) # may need to check agg here if not zone or region if self.AGG_BY != "region" and self.AGG_BY != "zone": agg_region_mapping = ( self.region_mapping[["region", self.AGG_BY]] .set_index("region") .to_dict()[self.AGG_BY] ) # Checks if keys all aggregate to a single value, this plot requires multiple values to work if len(set(agg_region_mapping.values())) == 1: return self.UnsupportedAggregation() rr_int = rr_int.reset_index() rr_int["parent"] = rr_int["parent"].map(agg_region_mapping) rr_int["child"] = rr_int["child"].map(agg_region_mapping) rr_int_agg = rr_int.groupby( ["timestamp", "parent", "child"], as_index=True ).sum() rr_int_agg.rename(columns={0: "flow (MW)"}, inplace=True) rr_int_agg = rr_int_agg.reset_index() parent_region = [zone_input] for parent in parent_region: single_parent = rr_int_agg[rr_int_agg["parent"] == parent] single_parent = single_parent.pivot( index="timestamp", columns="child", values="flow (MW)" ) single_parent = single_parent.loc[ :, (single_parent != 0).any(axis=0) ] # Remove all 0 columns (uninteresting). if parent in single_parent.columns: single_parent = single_parent.drop( columns=[parent] ) # Remove columns if parent = child # single_parent = single_parent / unitconversion['divisor'] scenario_names = pd.Series( [scenario] * len(single_parent), name="Scenario" ) # data_table = single_parent.add_suffix(f" ({unitconversion['units']})") data_table = single_parent.set_index([scenario_names], append=True) # great, now we can match against something # eventually probably want this to be able to aggregate all zones # and do things other than load # lookup_label = f'{agg}_Load'#"generator_Generation"#f'{agg}_Load' if prop[0] == "Load" or prop[0] == "Unserved_Energy": lookup_label = f"{agg}_{prop[0]}" elif prop[0] == "Net_Interchange": lookup_label = f"{agg}_{prop[0]}" else: lookup_label = f"generator_{prop[0]}" print(lookup_label, scenario) if "generator" in lookup_label: Load = self.mplot_data_dict[lookup_label].get(scenario).copy() Load.reset_index(inplace=True) Load.set_index(["timestamp", self.AGG_BY], inplace=True) assert prop[1] in Load["tech"].unique() Load = Load[Load["tech"] == prop[1]].copy() Load = Load["values"].copy() lookup_label += f"_{prop[1]}" else: Load = self.mplot_data_dict[lookup_label].get(scenario).copy() if self.shift_leapday is True: Load = self.shift_leapday(Load, self.Marmot_Solutions_folder) if ( prop[0] == "Net_Interchange" ): # want a second prop to tell what to replace? Load = Load.xs("PJM-W", level=self.AGG_BY) lookup_label = lookup_label.replace("region", "PJM-W") print(f"lookup_label is {lookup_label}") else: Load = Load.xs(zone_input, level=self.AGG_BY) Load = Load.groupby(["timestamp"]).sum() # do a comparison that could allow switching to net load? # print('my comparison!!!') # print(Load) # print(net_load) # print(type(net_load)) # print(net_load.tolist()) # print(Load.squeeze()) if use_net_load: data_table["sort_attribute"] = net_load.tolist() else: data_table["sort_attribute"] = Load.squeeze().tolist() label_addenda = zone_input data_table_chunks.append(data_table) # sorted_data_table = data_table.sort_values(['sort_attribute'],ascending = False) # data_avgs.append(sorted_data_table.mean(axis=0)) # data_table_chunks.append(sorted_data_table.iloc[:int(len(sorted_data_table.index)*top_pct),:]) # data_table_chunks.append(data_table) # can we recategorize? if categorize_hours and use_net_load: tmp_df["net_load"] = net_load.tolist() tmp_df["pct_rank"] = tmp_df["net_load"].rank(pct=True) pct_cut = 0.98 tmp_df["cat"] = [ f"RA hour (top {int(100*(1-pct_cut))}% of {label_addenda} net load hours)" if a > pct_cut else "non-RA hour" for a in tmp_df["pct_rank"] ] # tmp_df['hour'].apply(lambda x: f'on-peak (HB {begin_onpeak}-{end_onpeak})' if begin_onpeak<x<=end_onpeak else 'off-peak') # make categories as desired zone_interchange_timeseries = pd.concat( data_table_chunks, copy=False, axis=0 ) # create and bank unit conversions line_unitconversion = self.capacity_energy_unitconversion( zone_interchange_timeseries.iloc[:, :-1].abs().values.max(), self.Scenarios, ) zone_interchange_timeseries.iloc[:, :-1] = ( zone_interchange_timeseries.iloc[:, :-1] / line_unitconversion["divisor"] ) # zone_interchange_avgs = pd.concat(data_avgs, copy=False, axis=0) attr_unitconversion = self.capacity_energy_unitconversion( zone_interchange_timeseries.iloc[:, -1].abs().values.max(), self.Scenarios, ) zone_interchange_timeseries.iloc[:, -1] = ( zone_interchange_timeseries.iloc[:, -1] / attr_unitconversion["divisor"] ) # Make a facet plot, one panel for each child zone. plot_number = len(zone_interchange_timeseries.columns) - 1 xdimension, ydimension = set_x_y_dimension(plot_number) grid_size = xdimension * ydimension excess_axs = grid_size - plot_number fig3, axs = self.setup_plot(xdimension, ydimension, sharey=False) plt.subplots_adjust(wspace=0.6, hspace=0.7) # then have a flag for whether you want to use them to set color of scatter color_list = self.color_list # ['r','g','y','b','o'] # try it also as a scatter for n, column in enumerate(zone_interchange_timeseries.columns[:-1]): if categorize_hours: # run a join zone_interchange_timeseries_wcategories = ( zone_interchange_timeseries.join(tmp_df.set_index("timestamp")) ) categories = list( zone_interchange_timeseries_wcategories["cat"].unique() ) color_dict = dict(zip(categories, color_list)) axs[n].scatter( zone_interchange_timeseries_wcategories[column], zone_interchange_timeseries_wcategories["sort_attribute"], c=zone_interchange_timeseries_wcategories["cat"].map( color_dict ), s=2, alpha=0.05, rasterized=True, ) axs[n].set_xlabel( f"{zone_input} to {column} Flow ({line_unitconversion['units']})", fontsize=12, ) print(lookup_label) if use_net_load and "net" not in lookup_label: split_label = lookup_label.split("_") split_label.insert(1, "net") lookup_label = "_".join(split_label) axs[n].set_ylabel( f"{lookup_label} - {label_addenda} - ({attr_unitconversion['units']})", fontsize=12, ) d1 = str(tmp_df.at[tmp_df.index[0], "timestamp"]) d2 = str( tmp_df.at[tmp_df.index[len(tmp_df.index) - 1], "timestamp"] ) date1 = d1.split(" ")["values"] date2 = d2.split(" ")["values"] axs[n].set_title( f"5min intervals for {date1} - {date2} plotted", fontsize=12 ) # subset before doing polyfit for cat in categories: zone_interchange_timeseries_wcategories_subset = ( zone_interchange_timeseries_wcategories[ zone_interchange_timeseries_wcategories["cat"] == cat ].copy() ) z = np.polyfit( zone_interchange_timeseries_wcategories_subset[column], zone_interchange_timeseries_wcategories_subset[ "sort_attribute" ], 1, ) p = np.poly1d(z) axs[n].plot( zone_interchange_timeseries_wcategories_subset[column], p(zone_interchange_timeseries_wcategories_subset[column]), c=color_dict[cat], label=cat, ) axs[n].legend( loc="best", facecolor="inherit", frameon=False, fontsize=8 ) else: axs[n].scatter( zone_interchange_timeseries[column], zone_interchange_timeseries["sort_attribute"], c="r", s=2, alpha=0.05, ) axs[n].set_xlabel( f"{zone_input} to {column} Flow ({line_unitconversion['units']})", fontsize=12, ) axs[n].set_ylabel( f"{lookup_label} ({attr_unitconversion['units']})", fontsize=12 ) z = np.polyfit( zone_interchange_timeseries[column], zone_interchange_timeseries["sort_attribute"], 1, ) p = np.poly1d(z) axs[n].plot( zone_interchange_timeseries[column], p(zone_interchange_timeseries[column]), "r--", ) # TODO # season split, how to mark lines, legend, T/F input for using categorize_hours, iteration thru wind, solar, load, etc. with single input # this actually seems too complicated for now # then iterate across the child zones, plotting relevant info # for n,column in enumerate(zone_interchange_timeseries.columns[:-1]): # self.create_line_plot(axs,zone_interchange_timeseries.reset_index(),column,label=column,n=n) # ax2 = axs[n].twinx() # ax2.plot(zone_interchange_timeseries.reset_index()['sort_attribute'], color='red', label='sort_attribute') # axs[n].axhline(y = zone_interchange_avgs[column], ls = '--',label = f'{column} avg',color = 'blue') # ax2.axhline(y = zone_interchange_avgs['sort_attribute'], ls = '--',label = 'attr avg',color = 'red') # axs[n].set_title(f"{zone_input} to {column}") # axs[n].yaxis.set_major_formatter(mpl.ticker.FuncFormatter(lambda x, p: format(x, f',.{self.y_axes_decimalpt}f'))) # axs[n].margins(x=0.01) # axs[n].set_xlabel('Rank in timeseries') # axs[n].set_ylabel('Flow (MW)',color='blue',fontsize=12) # ax2.set_ylabel("Load (MW)",color="red",fontsize=12) # axs[n].set_xlim([0,int(len(zone_interchange_timeseries.index)*1.2)]) #extend # ax2.set_xlim([0,int(len(zone_interchange_timeseries.index)*1.2)]) #extend # add labels # for line,label in zip(axs[n].lines, ax1_labs): # y = line.get_ydata()[-1] # x = line.get_xdata()[-1] # axs[n].annotate(label, xy=(1,y), xytext=(.82*x,0), color=line.get_color(), # xycoords = axs[n].get_yaxis_transform(), textcoords="offset points", # size=12, va="center") # for line,label in zip(ax2.lines, ax2_labs): # y = line.get_ydata()[-1] # x = line.get_xdata()[-1] # ax2.annotate(label, xy=(.8,y), xytext=(.82*x,0), color=line.get_color(), # xycoords = ax2.get_yaxis_transform(), textcoords="offset points", # size=12, va="center") # axs[n].legend(loc='lower left',bbox_to_anchor=(1,0), facecolor='inherit', frameon=True) # clean empty and resize # Remove extra axes if excess_axs != 0: self.remove_excess_axs(axs, excess_axs, grid_size) outputs[zone_input] = { "fig": fig3, "data_table": zone_interchange_timeseries, } return outputs # return self.UnderDevelopment() def _overwrite_line_limits_from_minmax_flows( self, scenario, overwrite_properties=["line_Import_Limit", "line_Export_Limit"] ): """ Overwrites line_Import_Limit and line_Export_Limit for a given scenario with the max and min flows (respectively) on the line min flows are negative by convention Parameters ---------- scenario : str name of scenario with missing export/import limits to ovewrite overwrite_properties : list column names of export/import limits to be ovewritten Returns ------- tuple of pandas.DataFrame each data frame has timeseries fo new export and import limits for each line """ for property in overwrite_properties: self.logger.warning( f"{property} is being overwritten in {scenario} in this method by using line_Flow min/max" ) flow_reset_df = self.mplot_data_dict["line_Flow"][scenario].reset_index() flow_maxes = flow_reset_df.groupby(["line_name"]).max() flow_mins = flow_reset_df.groupby(["line_name"]).min() maxes_merge = flow_reset_df.merge(flow_maxes, on="line_name", how="left") maxes_merge = maxes_merge[["timestamp_x", "line_name", "0_y"]] # subset maxes_merge.rename(columns={"timestamp_x": "timestamp", "0_y": 0}, inplace=True) maxes_merge.set_index(["timestamp", "line_name"], inplace=True) mins_merge = flow_reset_df.merge(flow_mins, on="line_name", how="left") mins_merge = mins_merge[["timestamp_x", "line_name", "0_y"]] # subset mins_merge.rename(columns={"timestamp_x": "timestamp", "0_y": 0}, inplace=True) mins_merge.set_index(["timestamp", "line_name"], inplace=True) return (maxes_merge, mins_merge) def extract_tx_cap( self, figure_name=None, prop=None, start=None, end=None, timezone=None, start_date_range=None, end_date_range=None, unique_label_limit=15, ): # return self.UnderDevelopment() #TODO: Needs finishing outputs: dict = {} properties = [ (True, "line_Import_Limit", self.Scenarios), (True, "line_Export_Limit", self.Scenarios), ] properties_lines = [(True, "line_Flow", self.Scenarios)] # Runs get_data to populate mplot_data_dict with all required properties, returns a 1 if required data is missing check_input_data = self.get_data( self.mplot_data_dict, properties_lines, self.Marmot_Solutions_folder ) overwrite_input_data = self.get_data( self.mplot_data_dict, properties, self.Marmot_Solutions_folder ) if 1 in check_input_data: return self.MissingInputData() for scenario in self.Scenarios: # self.logger.info(scenario) if 1 in overwrite_input_data: import_lim, export_lim = self._overwrite_line_limits_from_minmax_flows( scenario, overwrite_properties=[tup[1] for tup in properties] ) else: import_lim = self.mplot_data_dict["line_Import_Limit"][ scenario ].reset_index() export_lim = self.mplot_data_dict["line_Export_Limit"][ scenario ].reset_index() grouped_import_lims = import_lim.groupby(["line_name"]).mean().reset_index() grouped_export_lims = export_lim.groupby(["line_name"]).mean().reset_index() # convert values to appropriate units? unitconversion = self.capacity_energy_unitconversion( max( grouped_import_lims["values"].values.max(), grouped_export_lims["values"].values.max(), ), self.Scenarios, ) for zone_input in self.Zones: self.logger.info(f"Zone = {zone_input}") lines = self.meta.region_interregionallines() lines = lines[lines[self.AGG_BY] == zone_input] import_line_cap = lines.merge( grouped_import_lims, how="inner", on="line_name" ) import_line_cap = import_line_cap[ [self.AGG_BY, "line_name", 0] ] # .set_index('region') import_line_cap["values"] = import_line_cap["values"].div( unitconversion["divisor"] ) export_line_cap = lines.merge( grouped_export_lims, how="inner", on="line_name" ) export_line_cap = export_line_cap[[self.AGG_BY, "line_name", 0]] export_line_cap["values"] = export_line_cap["values"].div( unitconversion["divisor"] ) data_table_out = pd.concat([import_line_cap, export_line_cap]) data_table_out.set_index(self.AGG_BY, inplace=True) data_table_out.columns = [ "line_name", f"line_capacity ({unitconversion['units']})", ] data_table_out["type"] = [ "import" if v < 0 else "export" for v in data_table_out[ f"line_capacity ({unitconversion['units']})" ] ] # subgroup if too many lines for plotting if ( len(import_line_cap.index) > unique_label_limit or len(export_line_cap.index) > unique_label_limit ): print( f"more than {unique_label_limit} lines in {zone_input}, so grouping by voltage_label for lines with kVs" ) import_line_cap["line_name"] = import_line_cap["line_name"].map( lambda a: re.sub(r"^([^_]*_){2}", "", a) if "kV" in a else a ) import_line_cap = ( import_line_cap.groupby([self.AGG_BY, "line_name"]) .sum() .reset_index() ) export_line_cap["line_name"] = export_line_cap["line_name"].map( lambda a: re.sub(r"^([^_]*_){2}", "", a) if "kV" in a else a ) export_line_cap = ( export_line_cap.groupby([self.AGG_BY, "line_name"]) .sum() .reset_index() ) import_line_cap = import_line_cap.pivot( index=self.AGG_BY, columns="line_name", values=0 ) export_line_cap = export_line_cap.pivot( index=self.AGG_BY, columns="line_name", values=0 ) # get unique names of lines to create custom legend l1 = import_line_cap.columns.tolist() l2 = export_line_cap.columns.tolist() l1.extend(l2) labels = np.unique(np.array(l1)).tolist() # make a colormap for each unique line cmap = plt.cm.get_cmap(lut=len(labels)) colors = [mpl.colors.rgb2hex(cmap(i)) for i in range(cmap.N)] color_dict = dict(zip(labels, colors)) fig1, axs = plt.subplots(1, 2, figsize=(xdimension, ydimension)) plt.subplots_adjust(wspace=0.35, hspace=0.2) axs = axs.ravel() # left panel import_line_cap.plot.bar( stacked=True, figsize=(xdimension, ydimension), rot=0, ax=axs[0], color=[color_dict[x] for x in import_line_cap.columns], edgecolor="black", linewidth="0.1", ) # eventually you want to stack plot capacity by scenario axs[0].spines["right"].set_visible(False) axs[0].spines["top"].set_visible(False) axs[0].set_ylabel( f"Import capacity in region ({unitconversion['units']})", color="black", rotation="vertical", ) axs[0].yaxis.set_major_formatter( mpl.ticker.FuncFormatter( lambda x, p: format(x, f",.{self.y_axes_decimalpt}f") ) ) axs[0].tick_params(axis="y", which="major", length=5, width=1) axs[0].tick_params(axis="x", which="major", length=5, width=1) axs[0].get_legend().remove() if plot_data_settings["plot_title_as_region"]: axs[0].set_title(zone_input) # right panel export_line_cap.plot.bar( stacked=True, figsize=(xdimension, ydimension), rot=0, ax=axs[1], color=[color_dict[x] for x in export_line_cap.columns], edgecolor="black", linewidth="0.1", ) # eventually you want to stack plot capacity by scenario axs[1].spines["right"].set_visible(False) axs[1].spines["top"].set_visible(False) axs[1].set_ylabel( f"Export capacity in region ({unitconversion['units']})", color="black", rotation="vertical", ) axs[1].yaxis.set_major_formatter( mpl.ticker.FuncFormatter( lambda x, p: format(x, f",.{self.y_axes_decimalpt}f") ) ) axs[1].tick_params(axis="y", which="major", length=5, width=1) axs[1].tick_params(axis="x", which="major", length=5, width=1) if plot_data_settings["plot_title_as_region"]: axs[0].set_title(zone_input) # create custom line legend handles = [] for line in labels: line_legend = mpl.patches.Patch( facecolor=color_dict[line], alpha=1.0 ) handles.append(line_legend) axs[1].legend( reversed(handles), reversed(labels), loc="lower left", bbox_to_anchor=(1.05, 0), facecolor="inherit", frameon=True, ) # add labels to panels axs[0].set_title( "A.", fontdict={"weight": "bold", "size": 11}, loc="left", pad=4 ) axs[1].set_title( "B.", fontdict={"weight": "bold", "size": 11}, loc="left", pad=4 ) fig1.add_subplot(111, frameon=False) plt.tick_params( labelcolor="none", top=False, bottom=False, left=False, right=False ) outputs[zone_input] = {"fig": fig1, "data_table": data_table_out} return outputs
[docs] def region_region_interchange_all_scenarios(self, **kwargs): """ #TODO: Finish Docstring This method creates a timeseries line plot of interchange flows between the selected region to each connecting region. If there are more than 4 total interchanges, all other interchanges are aggregated into an 'other' grouping Each scenarios is plotted on a separate Facet plot. Figures and data tables are returned to plot_main """ outputs = self._region_region_interchange(self.Scenarios, **kwargs) return outputs
[docs] def region_region_interchange_all_regions(self, **kwargs): """ #TODO: Finish Docstring This method creates a timeseries line plot of interchange flows between the selected region to each connecting region. All regions are plotted on a single figure with each focus region placed on a separate facet plot If there are more than 4 total interchanges, all other interchanges are aggregated into an 'other' grouping This figure only plots a single scenario that is defined by Main_scenario_plot in user_defined_inputs.csv. Figures and data tables are saved within method """ outputs = self._region_region_interchange( [self.Scenarios[0]], plot_scenario=False, **kwargs ) return outputs
def _region_region_interchange( self, scenario_type: str, plot_scenario: bool = True, timezone: str = "", **_ ): """#TODO: Finish Docstring Args: scenario_type (str): [description] plot_scenario (bool, optional): [description]. Defaults to True. timezone (str, optional): [description]. Defaults to "". Returns: [type]: [description] """ outputs: dict = {} if self.AGG_BY == "zone": agg = "zone" else: agg = "region" duration_curve = False # if 'duration_curve' in figure_name: # duration_curve=True # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, f"{agg}_{agg}s_Net_Interchange", scenario_type)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() for zone_input in self.Zones: logger.info(f"Zone = {zone_input}") ncols, nrows = set_facet_col_row_dimensions( self.xlabels, self.ylabels, multi_scenario=scenario_type ) mplt = PlotLibrary(nrows, ncols, sharey=True, squeeze=False, ravel_axs=True) fig, axs = mplt.get_figure() plt.subplots_adjust(wspace=0.6, hspace=0.3) data_table_chunks = [] n = 0 for scenario in scenario_type: rr_int = self[f"{agg}_{agg}s_Net_Interchange"].get(scenario) if shift_leapday: rr_int = adjust_for_leapday(rr_int) # For plot_main handeling - need to find better solution if plot_scenario == False: outputs = {} for zone_input in self.Zones: outputs[zone_input] = pd.DataFrame() if self.AGG_BY != "region" and self.AGG_BY != "zone": agg_region_mapping = ( self.region_mapping[["region", self.AGG_BY]] .set_index("region") .to_dict()[self.AGG_BY] ) # Checks if keys all aggregate to a single value, this plot requires multiple values to work if len(set(agg_region_mapping.values())) == 1: return UnsupportedAggregation() rr_int = rr_int.reset_index() rr_int["parent"] = rr_int["parent"].map(agg_region_mapping) rr_int["child"] = rr_int["child"].map(agg_region_mapping) rr_int_agg = rr_int.groupby( ["timestamp", "parent", "child"], as_index=True ).sum() rr_int_agg.rename(columns={"values": "flow (MW)"}, inplace=True) rr_int_agg = rr_int_agg.reset_index() # If plotting all regions update plot setup if plot_scenario == False: # Make a facet plot, one panel for each parent zone. parent_region = rr_int_agg["parent"].unique() plot_number = len(parent_region) ncols, nrows = set_x_y_dimension(plot_number) mplt = PlotLibrary(nrows, ncols, squeeze=False, ravel_axs=True) fig, axs = mplt.get_figure() plt.subplots_adjust(wspace=0.6, hspace=0.7) else: parent_region = [zone_input] plot_number = len(scenario_type) grid_size = ncols * nrows excess_axs = grid_size - plot_number for parent in parent_region: single_parent = rr_int_agg[rr_int_agg["parent"] == parent] single_parent = single_parent.pivot( index="timestamp", columns="child", values="flow (MW)" ) single_parent = single_parent.loc[ :, (single_parent != 0).any(axis=0) ] # Remove all 0 columns (uninteresting). if parent in single_parent.columns: single_parent = single_parent.drop( columns=[parent] ) # Remove columns if parent = child # Neaten up lines: if more than 4 total interchanges, aggregated all but the highest 3. if len(single_parent.columns) > 4: # Set the "three highest zonal interchanges" for all three scenarios. cols_dontagg = ( single_parent.max() .abs() .sort_values(ascending=False)[0:3] .index ) df_dontagg = single_parent[cols_dontagg] df_toagg = single_parent.drop(columns=cols_dontagg) agged = df_toagg.sum(axis=1) df_dontagg.insert(len(df_dontagg.columns), "Other", agged) single_parent = df_dontagg.copy() # Convert units if n == 0: unitconversion = self.capacity_energy_unitconversion( single_parent, self.Scenarios ) single_parent = single_parent / unitconversion["divisor"] for column in single_parent.columns: mplt.lineplot(single_parent, column, label=column, sub_pos=n) axs[n].set_title(parent) axs[n].margins(x=0.01) mplt.set_subplot_timeseries_format(sub_pos=n) axs[n].hlines( y=0, xmin=axs[n].get_xlim()[0], xmax=axs[n].get_xlim()[1], linestyle=":", ) # Add horizontal line at 0. axs[n].legend(loc="lower left", bbox_to_anchor=(1, 0)) n += 1 # Create data table for each scenario scenario_names = pd.Series( [scenario] * len(single_parent), name="Scenario" ) data_table = single_parent.add_suffix(f" ({unitconversion['units']})") data_table = data_table.set_index([scenario_names], append=True) data_table_chunks.append(data_table) # if plotting all scenarios add facet labels if plot_scenario is True: mplt.add_facet_labels(xlabels=self.xlabels, ylabels=self.ylabels) # Remove extra axes mplt.remove_excess_axs(excess_axs, grid_size) plt.xlabel(timezone, color="black", rotation="horizontal", labelpad=30) plt.ylabel( f"Net Interchange ({unitconversion['units']})", color="black", rotation="vertical", labelpad=40, ) # If plotting all regions save output and return none plot_main if plot_scenario == False: # Location to save to data_table_out = rr_int_agg save_figures = self.figure_folder.joinpath( self.AGG_BY + "_transmission" ) fig.savefig( save_figures.joinpath( "Region_Region_Interchange_{}.svg".format(self.Scenarios[0]) ), dpi=600, bbox_inches="tight", ) data_table_out.to_csv( save_figures.joinpath( "Region_Region_Interchange_{}.csv".format(self.Scenarios[0]) ) ) outputs = DataSavedInModule() return outputs Data_Out = pd.concat(data_table_chunks, copy=False, axis=0) # if plotting all scenarios return figures to plot_main outputs[zone_input] = {"fig": fig, "data_table": Data_Out} return outputs
[docs] def region_region_checkerboard(self, **_): """Creates a checkerboard/heatmap figure showing total interchanges between regions/zones. Each scenario is plotted on its own facet plot. Plots and Data are saved within the module. Returns: DataSavedInModule: DataSavedInModule exception. """ outputs: dict = {} if self.AGG_BY == "zone": agg = "zone" else: agg = "region" # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, f"{agg}_{agg}s_Net_Interchange", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() ncols, nrows = set_x_y_dimension(len(self.Scenarios)) grid_size = ncols * nrows excess_axs = grid_size - len(self.Scenarios) mplt = PlotLibrary(nrows, ncols, squeeze=False, ravel_axs=True) fig, axs = mplt.get_figure() plt.subplots_adjust(wspace=0.02, hspace=0.4) max_flow_group = [] Data_Out = [] n = 0 for scenario in self.Scenarios: rr_int = self[f"{agg}_{agg}s_Net_Interchange"].get(scenario) if shift_leapday: rr_int = adjust_for_leapday(rr_int) if self.AGG_BY != "region" and self.AGG_BY != "zone": agg_region_mapping = ( self.region_mapping[["region", self.AGG_BY]] .set_index("region") .to_dict()[self.AGG_BY] ) # Checks if keys all aggregate to a single value, this plot requires multiple values to work if len(set(agg_region_mapping.values())) == 1: return UnsupportedAggregation() rr_int = rr_int.reset_index() rr_int["parent"] = rr_int["parent"].map(agg_region_mapping) rr_int["child"] = rr_int["child"].map(agg_region_mapping) rr_int_agg = rr_int.groupby(["parent", "child"], as_index=True).sum() rr_int_agg.rename(columns={"values": "flow (MW)"}, inplace=True) rr_int_agg = rr_int_agg.loc[ rr_int_agg["flow (MW)"] > 0.01 ] # Keep only positive flows rr_int_agg.sort_values(ascending=False, by="flow (MW)") rr_int_agg = rr_int_agg / 1000 # MWh -> GWh data_out = rr_int_agg.copy() data_out.rename( columns={"flow (MW)": "{} flow (GWh)".format(scenario)}, inplace=True ) max_flow = max(rr_int_agg["flow (MW)"]) rr_int_agg = rr_int_agg.unstack("child") rr_int_agg = rr_int_agg.droplevel(level=0, axis=1) current_cmap = plt.cm.get_cmap() current_cmap.set_bad(color="grey") axs[n].imshow(rr_int_agg) axs[n].set_xticks(np.arange(rr_int_agg.shape[1])) axs[n].set_yticks(np.arange(rr_int_agg.shape[0])) axs[n].set_xticklabels(rr_int_agg.columns) axs[n].set_yticklabels(rr_int_agg.index) axs[n].set_title(scenario.replace("_", " "), fontweight="bold") # Rotate the tick labels and set their alignment. plt.setp( axs[n].get_xticklabels(), rotation=90, ha="right", rotation_mode="anchor", ) # Delineate the boxes and make room at top and bottom axs[n].set_xticks(np.arange(rr_int_agg.shape[1] + 1) - 0.5, minor=True) axs[n].set_yticks(np.arange(rr_int_agg.shape[0] + 1) - 0.5, minor=True) axs[n].grid(which="minor", color="k", linestyle="-", linewidth=1) axs[n].tick_params(which="minor", bottom=False, left=False) max_flow_group.append(max_flow) Data_Out.append(data_out) n += 1 # Remove extra axes mplt.remove_excess_axs(excess_axs, grid_size) cmap = cm.inferno norm = mcolors.Normalize(vmin=0, vmax=max(max_flow_group)) cax = plt.axes([0.90, 0.1, 0.035, 0.8]) fig.colorbar( cm.ScalarMappable(norm=norm, cmap=cmap), cax=cax, label="Total Net Interchange [GWh]", ) plt.xlabel("To Region", color="black", rotation="horizontal", labelpad=40) plt.ylabel("From Region", color="black", rotation="vertical", labelpad=40) data_table_out = pd.concat(Data_Out, axis=1) save_figures = self.figure_folder.joinpath(f"{self.AGG_BY}_transmission") fig.savefig( save_figures.joinpath("region_region_checkerboard.svg"), dpi=600, bbox_inches="tight", ) data_table_out.to_csv(save_figures.joinpath("region_region_checkerboard.csv")) outputs = DataSavedInModule() return outputs
[docs] def line_violations_timeseries(self, **kwargs): """Creates a timeseries line plot of lineflow violations for each region. The magnitude of each violation is plotted on the y-axis Each sceanrio is plotted as a separate line. This methods calls _violations() to create the figure. Returns: dict: Dictionary containing the created plot and its data table. """ outputs = self._violations(**kwargs) return outputs
[docs] def line_violations_totals(self, **kwargs): """Creates a barplot of total lineflow violations for each region. Each sceanrio is plotted as a separate bar. This methods calls _violations() and passes the total_violations=True argument to create the figure. Returns: dict: Dictionary containing the created plot and its data table. """ outputs = self._violations(total_violations=True, **kwargs) return outputs
def _violations( self, total_violations: bool = False, timezone: str = "", start_date_range: str = None, end_date_range: str = None, **_, ): """Creates line violation plots, line plot and barplots This methods is called from line_violations_timeseries() and line_violations_totals() Args: total_violations (bool, optional): If True finds the sum of violations. Used to create barplots. Defaults to False. timezone (str, optional): The timezone to display on the x-axes. Defaults to "". start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table """ outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, "line_Violation", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() for zone_input in self.Zones: logger.info(f"Zone = {zone_input}") scenario_df_list = [] for scenario in self.Scenarios: logger.info(f"Scenario = {str(scenario)}") if self.AGG_BY == "zone": lines = self.meta.zone_lines(scenario) else: lines = self.meta.region_lines(scenario) line_v = self["line_Violation"].get(scenario) if pd.notna(start_date_range): line_v = set_timestamp_date_range( line_v, start_date_range, end_date_range ) if line_v.empty is True: logger.warning("No data in selected Date Range") continue line_v = line_v.reset_index() viol = line_v.merge(lines, on="line_name", how="left") if self.AGG_BY == "zone": viol = viol.groupby(["timestamp", "zone"]).sum() else: viol = viol.groupby(["timestamp", self.AGG_BY]).sum() one_zone = viol.xs(zone_input, level=self.AGG_BY) one_zone = one_zone.rename(columns={"values": scenario}) one_zone = ( one_zone.abs() ) # We don't care the direction of the violation scenario_df_list.append(one_zone) all_scenarios = pd.concat(scenario_df_list, axis=1) # remove columns that are all equal to 0 all_scenarios = all_scenarios.loc[:, (all_scenarios != 0).any(axis=0)] if all_scenarios.empty: outputs[zone_input] = MissingZoneData() continue unitconversion = self.capacity_energy_unitconversion( all_scenarios, self.Scenarios ) all_scenarios = all_scenarios / unitconversion["divisor"] data_table_out = all_scenarios.add_suffix(f" ({unitconversion['units']})") mplt = PlotLibrary() fig, ax = mplt.get_figure() if total_violations is True: all_scenarios_tot = all_scenarios.sum() # Set x-tick labels if self.custom_xticklabels: tick_labels = self.custom_xticklabels else: tick_labels = all_scenarios_tot.index mplt.barplot( all_scenarios_tot, color=self.color_list, stacked=False, custom_tick_labels=tick_labels, ) else: for column in all_scenarios: mplt.lineplot( all_scenarios, column, color=self.color_list, label=column ) ax.margins(x=0.01) mplt.set_subplot_timeseries_format(minticks=6, maxticks=12) ax.set_xlabel(timezone, color="black", rotation="horizontal") mplt.add_legend() if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) ax.set_ylabel( f"Line violations ({unitconversion['units']})", color="black", rotation="vertical", ) outputs[zone_input] = {"fig": fig, "data_table": data_table_out} return outputs
[docs] def net_export( self, timezone: str = "", start_date_range: str = None, end_date_range: str = None, **_, ): """creates a timeseries net export line graph. Scenarios are plotted as separate lines. Args: timezone (str, optional): The timezone to display on the x-axes. Defaults to "". start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table """ outputs: dict = {} if self.AGG_BY == "zone": agg = "zone" else: agg = "region" # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, f"{agg}_Net_Interchange", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() for zone_input in self.Zones: logger.info(f"{self.AGG_BY} = {zone_input}") net_export_chunks = [] for scenario in self.Scenarios: logger.info(f"Scenario = {scenario}") net_export_read = self[f"{agg}_Net_Interchange"].get(scenario) if shift_leapday: net_export_read = adjust_for_leapday(net_export_read) net_export = net_export_read.xs(zone_input, level=self.AGG_BY) net_export = net_export.groupby("timestamp").sum() net_export.columns = [scenario] if pd.notna(start_date_range): net_export = set_timestamp_date_range( net_export, start_date_range, end_date_range ) if net_export.empty is True: logger.warning("No data in selected Date Range") continue net_export_chunks.append(net_export) net_export_all_scenarios = pd.concat(net_export_chunks, axis=1) unitconversion = self.capacity_energy_unitconversion( net_export_all_scenarios, self.Scenarios ) net_export_all_scenarios = ( net_export_all_scenarios / unitconversion["divisor"] ) # Data table of values to return to main program data_table_out = net_export_all_scenarios.add_suffix( f" ({unitconversion['units']})" ) # Make scenario/color dictionary. color_dict = dict(zip(net_export_all_scenarios.columns, self.color_list)) mplt = PlotLibrary() fig, ax = mplt.get_figure() plt.subplots_adjust(wspace=0.05, hspace=0.2) if net_export_all_scenarios.empty: out = MissingZoneData() outputs[zone_input] = out continue for column in net_export_all_scenarios: mplt.lineplot( net_export_all_scenarios, column, color_dict, label=column ) ax.set_ylabel( f'Net exports ({unitconversion["units"]})', color="black", rotation="vertical", ) ax.set_xlabel(timezone, color="black", rotation="horizontal") ax.margins(x=0.01) ax.axhline(y=0, linestyle=":", color="gray") mplt.set_subplot_timeseries_format() mplt.add_legend(reverse_legend=True) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) outputs[zone_input] = {"fig": fig, "data_table": data_table_out} return outputs
[docs] def zonal_interchange( self, figure_name: str = "zonal_interchange", start_date_range: str = None, end_date_range: str = None, **_, ): """Creates a line plot of the net interchange between each zone, with a facet for each zone. The method will only work if agg_by = "zone". The code will create either a timeseries or duration curve depending on if the word 'duration_curve' is in the figure_name. To make a duration curve, ensure the word 'duration_curve' is found in the figure_name. Args: figure_name (str, optional): User defined figure output name. Defaults to None. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table """ if self.AGG_BY not in ["zone", "zones", "Zone", "Zones"]: logger.warning("This plot only supports aggregation zone") return UnsupportedAggregation() duration_curve = False if "duration_curve" in figure_name: duration_curve = True # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, "line_Flow", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() outputs: dict = {} # sets up x, y dimensions of plot ncols, nrows = set_facet_col_row_dimensions( self.xlabels, self.ylabels, multi_scenario=self.Scenarios ) grid_size = ncols * nrows # Used to calculate any excess axis to delete plot_number = len(self.Scenarios) excess_axs = grid_size - plot_number for zone_input in self.Zones: logger.info(f"{self.AGG_BY} = {zone_input}") mplt = PlotLibrary(nrows, ncols, sharey=True, squeeze=False, ravel_axs=True) fig, axs = mplt.get_figure() plt.subplots_adjust(wspace=0.1, hspace=0.5) net_exports_all = [] for n, scenario in enumerate(self.Scenarios): net_exports = [] exp_lines = self.meta.zone_exporting_lines(scenario) imp_lines = self.meta.zone_importing_lines(scenario) if exp_lines.empty or imp_lines.empty: return MissingMetaData() exp_lines.columns = ["region", "line_name"] imp_lines.columns = ["region", "line_name"] # Find list of lines that connect each region. exp_oz = exp_lines[exp_lines["region"] == zone_input] imp_oz = imp_lines[imp_lines["region"] == zone_input] other_zones = self.meta.zones(scenario).name.tolist() try: other_zones.remove(zone_input) except: logger.warning("Are you sure you set agg_by = zone?") logger.info(f"Scenario = {str(scenario)}") flow = self["line_Flow"][scenario].copy() if shift_leapday: flow = adjust_for_leapday(flow) flow = flow.reset_index() for other_zone in other_zones: exp_other_oz = exp_lines[exp_lines["region"] == other_zone] imp_other_oz = imp_lines[imp_lines["region"] == other_zone] exp_pair = pd.merge( exp_oz, imp_other_oz, left_on="line_name", right_on="line_name" ) imp_pair = pd.merge( imp_oz, exp_other_oz, left_on="line_name", right_on="line_name" ) # Swap columns for importing lines imp_pair = imp_pair.reindex( columns=["region_from", "line_name", "region_to"] ) export = flow[flow["line_name"].isin(exp_pair["line_name"])] imports = flow[flow["line_name"].isin(imp_pair["line_name"])] export = export.groupby(["timestamp"]).sum() imports = imports.groupby(["timestamp"]).sum() # Check for situations where there are only exporting or importing lines for this zonal pair. if imports.empty: net_export = export elif export.empty: net_export = -imports else: net_export = export - imports net_export.columns = [other_zone] if pd.notna(start_date_range): net_export = set_timestamp_date_range( net_export, start_date_range, end_date_range ) if net_export.empty is True: logger.warning("No data in selected Date Range") continue if duration_curve: net_export = sort_duration(net_export, other_zone) net_exports.append(net_export) net_exports = pd.concat(net_exports, axis=1) net_exports = net_exports.dropna(axis="columns") net_exports.index = pd.to_datetime(net_exports.index) net_exports["Net export"] = net_exports.sum(axis=1) # unitconversion based off peak export hour, only checked once if zone_input == self.Zones[0] and scenario == self.Scenarios[0]: unitconversion = self.capacity_energy_unitconversion( net_exports, self.Scenarios ) net_exports = net_exports / unitconversion["divisor"] if duration_curve: net_exports = net_exports.reset_index().drop(columns="index") for column in net_exports: linestyle = "--" if column == "Net export" else "solid" mplt.lineplot( net_exports, column=column, label=column, sub_pos=n, linestyle=linestyle, ) axs[n].margins(x=0.01) # Add horizontal line at 0. axs[n].axhline(y=0, linestyle=":", color="gray") if not duration_curve: mplt.set_subplot_timeseries_format(sub_pos=n) # Add scenario column to output table. scenario_names = pd.Series( [scenario] * len(net_exports), name="Scenario" ) net_exports = net_exports.add_suffix(f" ({unitconversion['units']})") net_exports = net_exports.set_index([scenario_names], append=True) net_exports_all.append(net_exports) mplt.add_facet_labels(xlabels=self.xlabels, ylabels=self.ylabels) mplt.add_legend() # Remove extra axes mplt.remove_excess_axs(excess_axs, grid_size) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) plt.ylabel( f"Net export ({unitconversion['units']})", color="black", rotation="vertical", labelpad=40, ) if duration_curve: plt.xlabel("Sorted Interval", color="black", labelpad=30) data_table_out = pd.concat(net_exports_all) # if plotting all scenarios return figures to plot_main outputs[zone_input] = {"fig": fig, "data_table": data_table_out} return outputs
[docs] def zonal_interchange_total( self, start_date_range: str = None, end_date_range: str = None, **_ ): """Creates a barplot of the net interchange between each zone, separated by positive and negative flows. The method will only work if agg_by = "zone". Args: start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table """ if self.AGG_BY not in ["zone", "zones", "Zone", "Zones"]: logger.warning("This plot only supports aggregation zone") return UnsupportedAggregation() # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, "line_Flow", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() outputs: dict = {} for zone_input in self.Zones: logger.info(f"{self.AGG_BY} = {zone_input}") mplt = PlotLibrary() fig, ax = mplt.get_figure() plt.subplots_adjust(wspace=0.05, hspace=0.2) net_exports_all = [] # Holds each scenario output table data_out_chunk = [] for n, scenario in enumerate(self.Scenarios): exp_lines = self.meta.zone_exporting_lines(scenario) imp_lines = self.meta.zone_importing_lines(scenario) if exp_lines.empty or imp_lines.empty: return MissingMetaData() exp_lines.columns = ["region", "line_name"] imp_lines.columns = ["region", "line_name"] # Find list of lines that connect each region. exp_oz = exp_lines[exp_lines["region"] == zone_input] imp_oz = imp_lines[imp_lines["region"] == zone_input] other_zones = self.meta.zones(scenario).name.tolist() other_zones.remove(zone_input) net_exports = [] logger.info(f"Scenario = {str(scenario)}") flow = self["line_Flow"][scenario] flow = flow.reset_index() for other_zone in other_zones: exp_other_oz = exp_lines[exp_lines["region"] == other_zone] imp_other_oz = imp_lines[imp_lines["region"] == other_zone] exp_pair = pd.merge( exp_oz, imp_other_oz, left_on="line_name", right_on="line_name" ) imp_pair = pd.merge( imp_oz, exp_other_oz, left_on="line_name", right_on="line_name" ) # Swap columns for importing lines imp_pair = imp_pair.reindex( columns=["region_from", "line_name", "region_to"] ) export = flow[flow["line_name"].isin(exp_pair["line_name"])] imports = flow[flow["line_name"].isin(imp_pair["line_name"])] export = export.groupby(["timestamp"]).sum() imports = imports.groupby(["timestamp"]).sum() # Check for situations where there are only exporting or importing lines for this zonal pair. if imports.empty: net_export = export elif export.empty: net_export = -imports else: net_export = export - imports net_export.columns = [other_zone] if pd.notna(start_date_range): net_export = set_timestamp_date_range( net_export, start_date_range, end_date_range ) if net_export.empty is True: logger.warning("No data in selected Date Range") continue net_exports.append(net_export) net_exports = pd.concat(net_exports, axis=1) net_exports = net_exports.dropna(axis="columns") net_exports.index = pd.to_datetime(net_exports.index) net_exports["Net Export"] = net_exports.sum(axis=1) positive = net_exports.agg(lambda x: x[x > 0].sum()) negative = net_exports.agg(lambda x: x[x < 0].sum()) both = pd.concat([positive, negative], axis=1) both.columns = ["Total Export", "Total Import"] # unitconversion based off peak export hour, only checked once if scenario == self.Scenarios[0]: unitconversion = self.capacity_energy_unitconversion( both, self.Scenarios, ) both = both / unitconversion["divisor"] net_exports_all.append(both) # Add scenario column to output table. scenario_names = pd.Series([scenario] * len(both), name="Scenario") data_table = both.set_index([scenario_names], append=True) data_table = data_table.add_suffix(f" ({unitconversion['units']})") data_out_chunk.append(data_table) data_table_out = pd.concat(data_out_chunk) # Make scenario/color dictionary. color_dict = dict(zip(self.Scenarios, self.color_list)) mplt.clustered_stacked_barplot( net_exports_all, labels=self.Scenarios, color_dict=color_dict ) ax.axhline(y=0, linestyle=":", color="gray") ax.set_ylabel( f"Interchange ({unitconversion['units']}h)", color="black", rotation="vertical", ) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) outputs[zone_input] = {"fig": fig, "data_table": data_table_out} return outputs
[docs] def total_int_flow_ind( self, prop: str, start_date_range: str = None, end_date_range: str = None, **_ ): """Creates a clustered barplot of the total flow for a specific interface, separated by positive and negative flows. Specify the interface(s) of interest by providing a comma separated string to the property entry. Scenarios are clustered together as different colored bars. If multiple interfaces are provided, each will be plotted as a separate group of clustered bar. Args: prop (str): Comma separated string of interfaces to plot. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: DataSavedInModule: DataSavedInModule exception. """ # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, "interface_Flow", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() # Select only interfaces specified in Marmot_plot_select.csv. select_ints = prop.split(",") if select_ints == None: return InputSheetError() logger.info("Plotting only the interfaces specified in Marmot_plot_select.csv") logger.info(select_ints) mplt = PlotLibrary() fig, ax = mplt.get_figure() plt.subplots_adjust(wspace=0.05, hspace=0.2) net_flows_all = [] # Holds each scenario output table data_out_chunk = [] for _, scenario in enumerate(self.Scenarios): logger.info(f"Scenario = {str(scenario)}") flow_all = self["interface_Flow"][scenario] both_chunk = [] available_inter = select_ints.copy() for inter in select_ints: if inter not in flow_all.index.get_level_values("interface_name"): logger.info(f"{inter} Not in Data") available_inter.remove(inter) continue # Remove leading spaces if inter[0] == " ": inter = inter[1:] flow = flow_all.xs(inter, level="interface_name") if pd.notna(start_date_range): flow = set_timestamp_date_range( flow, start_date_range, end_date_range ) if flow.empty is True: logger.warning("No data in selected Date Range") continue flow = flow["values"] pos_sing = pd.Series(flow.where(flow > 0).sum(), name="Total Export") neg_sing = pd.Series(flow.where(flow < 0).sum(), name="Total Import") both_df = pd.concat([pos_sing, neg_sing], axis=1) both_chunk.append(both_df) both = pd.concat(both_chunk) both.columns = ["Total Export", "Total Import"] if scenario == self.Scenarios[0]: unitconversion = self.capacity_energy_unitconversion( both, self.Scenarios ) both = both / unitconversion["divisor"] both.index = available_inter net_flows_all.append(both) # Add scenario column to output table. scenario_names = pd.Series([scenario] * len(both), name="Scenario") data_table = both.set_index([scenario_names], append=True) data_table = data_table.add_suffix(f" ({unitconversion['units']})") data_out_chunk.append(data_table) data_table_out = pd.concat(data_out_chunk) # Make scenario/color dictionary. color_dict = dict(zip(self.Scenarios, self.color_list)) mplt.clustered_stacked_barplot( net_flows_all, labels=self.Scenarios, color_dict=color_dict ) ax.axhline(y=0, linestyle=":", color="gray") ax.set_ylabel( "Flow ({}h)".format(unitconversion["units"]), color="black", rotation="vertical", ) ax.set_xlabel("") fig.savefig( self.figure_folder.joinpath( f"{self.AGG_BY }_transmission", "Individual_Interface_Total_Flow.svg" ), dpi=600, bbox_inches="tight", ) data_table_out.to_csv( self.figure_folder.joinpath( f"{self.AGG_BY }_transmission", "Individual_Interface_Total_Flow.csv" ) ) outputs = DataSavedInModule() return outputs
[docs] def get_line_interface_limits( self, extra_property_names: list, object_name: str = None ) -> pd.DataFrame: """Get and process line and interface limits Gets limits for all scenarios. If limits are equal across all scenarios, only retunrs a single set of limts else returns limits for each scenario. Args: extra_property_names (list): list of limit property names. object_name (str, optional): Name of line or interface if only a single object is needed. Returns: pd.DataFrame: dataframe of limts """ extra_data_frames = [] for ext_prop in extra_property_names: scen_chunk = [] for scenario in self.Scenarios: df: pd.DataFrame = self[ext_prop].get(scenario) if df.empty is False: if object_name: df = df.xs( object_name, level=next(n for n in df.index.names if "_name" in n), ) df = df.groupby(["timestamp"]).sum() # Filter out unenforced lines/interfaces. df = df[df["values"].abs() < 99998] df = df.rename(columns={"values": ext_prop}) scenario_names = pd.Series([scenario] * len(df), name="Scenario") df = df.set_index(scenario_names, append=True) scen_chunk.append(df) all_scenario_data = pd.concat(scen_chunk, axis=0) if all_scenario_data.empty: date_index = pd.date_range( start="2010-01-01", periods=1, freq="H", name="timestamp" ) extra_data_frames.append( pd.DataFrame(data=[0], index=date_index, columns=[ext_prop]) ) else: compare_limits = all_scenario_data.groupby("Scenario").mean().to_numpy() if (compare_limits[0] == compare_limits).all(): scen = all_scenario_data.index.get_level_values("Scenario")[0] extra_data_frames.append( all_scenario_data.xs(scen, level="Scenario") ) else: extra_data_frames.append(all_scenario_data) return pd.concat(extra_data_frames, axis=1).fillna(0)
[docs] def plot_line_interface_limits( self, limits: pd.DataFrame, mplt: PlotLibrary, n: int, duration_curve: bool = False, ) -> None: """Plots line/interface limits on a subplot Args: limits (pd.DataFrame): dataframe of import/export limits with a timeseries mplt (PlotLibrary): Instance of PlotLibrary n (int): Counter for subplots duration_curve (bool, optional): When potting a duration curve. If True, uses the max of the limit values. Defaults to False. """ limits_dict = {} linestyles = ["-.", "--"] if ( limits.filter(like="Export").to_numpy().sum() > 0 or limits.filter(like="Import").to_numpy().sum() < 0 ): limits_dict["Export Limit"] = limits.filter(like="Export", axis=1) limits_dict["Import Limit"] = limits.filter(like="Import", axis=1) for scenario in self.Scenarios: for l, limit_type in enumerate(limits_dict): if "Scenario" in limits.index.names: limit_values = limits_dict[limit_type].xs( scenario, level="Scenario" ) legend_label = f"{scenario}\n{limit_type}" else: limit_values = limits_dict[limit_type] legend_label = limit_type if limit_values.abs().to_numpy().max() == 0: continue else: if duration_curve or len( limits.index.get_level_values("timestamp") == 1 ): mplt.axs[n].axhline( y=limit_values.to_numpy().max(), linestyle=linestyles[l], label=legend_label, ) else: mplt.lineplot( limit_values.squeeze(), linestyle=linestyles[l], label=legend_label, sub_pos=n, ) if "Scenario" not in limits.index.names: continue