Source code for marmot.plottingmodules.curtailment

# -*- coding: utf-8 -*-
"""Device curtailment plots.

This module creates plots are related to the curtailment of generators.

@author: Daniel Levie
"""

import logging
from pathlib import Path
from typing import List

import pandas as pd

import marmot.utils.mconfig as mconfig
from marmot.plottingmodules.plotutils.plot_data_helper import (
    GenCategories,
    PlotDataStoreAndProcessor,
)
from marmot.plottingmodules.plotutils.plot_exceptions import (
    DataSavedInModule,
    MissingInputData,
    MissingZoneData,
    UnderDevelopment,
)
from marmot.plottingmodules.plotutils.plot_library import PlotLibrary, SetupSubplot
from marmot.plottingmodules.plotutils.styles import (
    ColorList,
    GeneratorColorDict,
    PlotMarkers,
)
from marmot.plottingmodules.plotutils.timeseries_modifiers import (
    get_sub_hour_interval_count,
    set_timestamp_date_range,
)

logger = logging.getLogger("plotter." + __name__)
plot_data_settings: dict = mconfig.parser("plot_data")
curtailment_prop: str = mconfig.parser("plot_data", "curtailment_property")


[docs]class Curtailment(PlotDataStoreAndProcessor): """Device curtailment plots. The curtailment.py module contains methods that are related to the curtailment of generators . Curtailment inherits from the PlotDataStoreAndProcessor class to assist in creating figures. """ def __init__( self, Zones: List[str], Scenarios: List[str], AGG_BY: str, ordered_gen: List[str], marmot_solutions_folder: Path, gen_categories: GenCategories = GenCategories(), marmot_color_dict: dict = None, custom_xticklabels: List[str] = None, color_list: list = ColorList().colors, marker_style: List = PlotMarkers().markers, **kwargs, ): """ Args: Zones (List[str]): List of regions/zones to plot. Scenarios (List[str]): List of scenarios to plot. AGG_BY (str): Informs region type to aggregate by when creating plots. ordered_gen (List[str]): Ordered list of generator technologies to plot, order defines the generator technology position in stacked bar and area plots. marmot_solutions_folder (Path): Directory containing Marmot solution outputs. gen_categories (GenCategories): Instance of GenCategories class, groups generator technologies into defined categories. Deafults to GenCategories. marmot_color_dict (dict, optional): Dictionary of colors to use for generation technologies. Defaults to None. custom_xticklabels (List[str], optional): List of custom x labels to apply to barplots. Values will overwite existing ones. Defaults to None. color_list (list, optional): List of colors to apply to non-gen plots. Defaults to ColorList().colors. marker_style (List, optional): List of markers for plotting. Defaults to PlotMarkers().markers. """ # Instantiation of PlotDataStoreAndProcessor super().__init__(AGG_BY, ordered_gen, marmot_solutions_folder, **kwargs) self.Zones = Zones self.Scenarios = Scenarios self.gen_categories = gen_categories if marmot_color_dict is None: self.marmot_color_dict = GeneratorColorDict.set_random_colors( self.ordered_gen ).color_dict else: self.marmot_color_dict = marmot_color_dict self.custom_xticklabels = custom_xticklabels self.color_list = color_list self.marker_style = marker_style
[docs] def curt_duration_curve( self, prop: str = None, start_date_range: str = None, end_date_range: str = None, **_, ): """Curtailment duration curve (line plot) Displays curtailment sorted from highest occurrence to lowest over given time period. Args: prop (str, optional): Controls type of re to include in plot. Controlled through the plot_select.csv. Defaults to None. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table. """ outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, f"generator_{curtailment_prop}", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() for zone_input in self.Zones: logger.info(f"{self.AGG_BY} = {zone_input}") RE_Curtailment_DC = pd.DataFrame() PV_Curtailment_DC = pd.DataFrame() for scenario in self.Scenarios: logger.info(f"Scenario = {scenario}") re_curt = self[f"generator_{curtailment_prop}"].get(scenario) # Timeseries [MW] RE curtailment [MWh] try: # Check for regions missing all generation. re_curt = re_curt.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info(f"No curtailment in {zone_input}") continue re_curt = self.df_process_gen_inputs(re_curt) # If using Marmot's curtailment property if curtailment_prop == "Curtailment": re_curt = self.assign_curtailment_techs( re_curt, self.gen_categories.vre ) # Timeseries [MW] PV curtailment [MWh] pv_curt = re_curt[re_curt.columns.intersection(self.gen_categories.pv)] re_curt = re_curt.sum(axis=1) pv_curt = pv_curt.sum(axis=1) re_curt = re_curt.squeeze() # Convert to Series pv_curt = pv_curt.squeeze() # Convert to Series if pd.notna(start_date_range): re_curt, pv_curt = set_timestamp_date_range( [re_curt, pv_curt], start_date_range, end_date_range ) if re_curt.empty or pv_curt.empty: logger.warning("No curtailment in selected Date Range") continue # Sort from larget to smallest re_cdc = re_curt.sort_values(ascending=False).reset_index(drop=True) pv_cdc = pv_curt.sort_values(ascending=False).reset_index(drop=True) re_cdc.rename(scenario, inplace=True) pv_cdc.rename(scenario, inplace=True) RE_Curtailment_DC = pd.concat( [RE_Curtailment_DC, re_cdc], axis=1, sort=False ) PV_Curtailment_DC = pd.concat( [PV_Curtailment_DC, pv_cdc], axis=1, sort=False ) # Remove columns that have values less than 1 RE_Curtailment_DC = RE_Curtailment_DC.loc[ :, (RE_Curtailment_DC >= 1).any(axis=0) ] PV_Curtailment_DC = PV_Curtailment_DC.loc[ :, (PV_Curtailment_DC >= 1).any(axis=0) ] # Replace _ with white space RE_Curtailment_DC.columns = RE_Curtailment_DC.columns.str.replace("_", " ") PV_Curtailment_DC.columns = PV_Curtailment_DC.columns.str.replace("_", " ") # Create Dictionary from scenario names and color list colour_dict = dict(zip(RE_Curtailment_DC.columns, self.color_list)) mplt = SetupSubplot() fig, ax = mplt.get_figure() if prop == "PV": if PV_Curtailment_DC.empty: out = MissingZoneData() outputs[zone_input] = out continue # unit conversion return divisor and energy units unitconversion = self.capacity_energy_unitconversion( PV_Curtailment_DC, self.Scenarios ) PV_Curtailment_DC = PV_Curtailment_DC / unitconversion["divisor"] Data_Table_Out = PV_Curtailment_DC Data_Table_Out = Data_Table_Out.add_suffix( f" ({unitconversion['units']})" ) x_axis_lim = 1.25 * len(PV_Curtailment_DC) for column in PV_Curtailment_DC: ax.plot( PV_Curtailment_DC[column], linewidth=3, color=colour_dict[column], label=column, ) ax.set_ylabel( f"PV Curtailment ({unitconversion['units']})", color="black", rotation="vertical", ) if prop == "PV+Wind": if RE_Curtailment_DC.empty: out = MissingZoneData() outputs[zone_input] = out continue # unit conversion return divisor and energy units unitconversion = self.capacity_energy_unitconversion( RE_Curtailment_DC, self.Scenarios ) RE_Curtailment_DC = RE_Curtailment_DC / unitconversion["divisor"] Data_Table_Out = RE_Curtailment_DC Data_Table_Out = Data_Table_Out.add_suffix( f" ({unitconversion['units']})" ) x_axis_lim = 1.25 * len(RE_Curtailment_DC) for column in RE_Curtailment_DC: ax.plot( RE_Curtailment_DC[column], linewidth=3, color=colour_dict[column], label=column, ) ax.set_ylabel( f"PV + Wind Curtailment ({unitconversion['units']})", color="black", rotation="vertical", ) ax.set_xlabel("Hours", color="black", rotation="horizontal") mplt.set_yaxis_major_tick_format() ax.margins(x=0.01) # ax.set_xlim(0, 9490) ax.set_xlim(0, x_axis_lim) ax.set_ylim(bottom=0) # Add legend mplt.add_legend() # Set title if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) outputs[zone_input] = {"fig": fig, "data_table": Data_Table_Out} return outputs
[docs] def curt_pen( self, prop: str = None, start_date_range: str = None, end_date_range: str = None, **_, ): """Plot of curtailment vs penetration. Each scenario is represented by a different symbel on a x, y axis Args: prop (str, optional): Controls type of re to include in plot. Controlled through the plot_select.csv. Defaults to None. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table. """ outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [ (True, "generator_Generation", self.Scenarios), (True, "generator_Available_Capacity", self.Scenarios), (True, f"generator_{curtailment_prop}", self.Scenarios), (True, "generator_Total_Generation_Cost", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() for zone_input in self.Zones: Penetration_Curtailment_out = pd.DataFrame() logger.info(f"{self.AGG_BY } = {zone_input}") for scenario in self.Scenarios: logger.info(f"Scenario = {scenario}") gen = self["generator_Generation"].get(scenario) try: # Check for regions missing all generation. gen = gen.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info(f"No generation in {zone_input}") continue avail_gen = self["generator_Available_Capacity"].get(scenario) avail_gen = avail_gen.xs(zone_input, level=self.AGG_BY) re_curt = self[f"generator_{curtailment_prop}"].get(scenario) try: re_curt = re_curt.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info(f"No curtailment in {zone_input}") continue re_curt = self.df_process_gen_inputs(re_curt) # If using Marmot's curtailment property if curtailment_prop == "Curtailment": re_curt = self.assign_curtailment_techs( re_curt, self.gen_categories.vre ) # Total generation cost Total_Gen_Cost = self["generator_Total_Generation_Cost"].get(scenario) Total_Gen_Cost = Total_Gen_Cost.xs(zone_input, level=self.AGG_BY) if pd.notna(start_date_range): ( gen, avail_gen, re_curt, Total_Gen_Cost, ) = set_timestamp_date_range( [gen, avail_gen, re_curt, Total_Gen_Cost], start_date_range, end_date_range, ) if re_curt.empty: logger.warning("No curtailment in selected Date Range") continue # Finds the number of unique hours in the year no_hours_year = len(gen.index.unique(level="timestamp")) # Total generation across all technologies [MWh] total_gen = float(gen.sum()) # Timeseries [MW] and Total VRE generation [MWh] vre_gen = gen.loc[gen.index.isin(self.gen_categories.vre, level="tech")] total_vre_gen = float(vre_gen.sum()) # Timeseries [MW] and Total RE generation [MWh] re_gen = gen.loc[gen.index.isin(self.gen_categories.re, level="tech")] total_re_gen = float(re_gen.sum()) # Timeseries [MW] and Total PV generation [MWh] pv_gen = gen.loc[gen.index.isin(self.gen_categories.pv, level="tech")] total_pv_gen = float(pv_gen.sum()) # % Penetration of generation classes across the year VRE_Penetration = (total_vre_gen / total_gen) * 100 RE_Penetration = (total_re_gen / total_gen) * 100 PV_Penetration = (total_pv_gen / total_gen) * 100 # Timeseries [MW] and Total RE available [MWh] re_avail = avail_gen.loc[ avail_gen.index.isin(self.gen_categories.re, level="tech") ] total_re_avail = float(re_avail.sum()) # Timeseries [MW] and Total PV available [MWh] pv_avail = avail_gen.loc[ avail_gen.index.isin(self.gen_categories.pv, level="tech") ] total_pv_avail = float(pv_avail.sum()) # Total RE curtailment [MWh] total_re_curt = float(re_curt.sum().sum()) # Timeseries [MW] and Total PV curtailment [MWh] pv_curt = re_curt[re_curt.columns.intersection(self.gen_categories.pv)] total_pv_curt = float(pv_curt.sum().sum()) # % of hours with curtailment Prct_hr_RE_curt = ( len((re_curt.sum(axis=1)).loc[(re_curt.sum(axis=1)) > 0]) / no_hours_year ) * 100 Prct_hr_PV_curt = ( len((pv_curt.sum(axis=1)).loc[(pv_curt.sum(axis=1)) > 0]) / no_hours_year ) * 100 # Max instantaneous curtailment if re_curt.empty == True: continue else: Max_RE_Curt = max(re_curt.sum(axis=1)) if pv_curt.empty == True: continue else: Max_PV_Curt = max(pv_curt.sum(axis=1)) # % RE and PV Curtailment Capacity Factor if total_pv_curt > 0: RE_Curt_Cap_factor = (total_re_curt / Max_RE_Curt) / no_hours_year PV_Curt_Cap_factor = (total_pv_curt / Max_PV_Curt) / no_hours_year else: RE_Curt_Cap_factor = 0 PV_Curt_Cap_factor = 0 # % Curtailment across the year if total_re_avail == 0: continue else: Prct_RE_curt = (total_re_curt / total_re_avail) * 100 if total_pv_avail == 0: continue else: Prct_PV_curt = (total_pv_curt / total_pv_avail) * 100 Total_Gen_Cost = float(Total_Gen_Cost.sum()) vg_out = pd.Series( [ PV_Penetration, RE_Penetration, VRE_Penetration, Max_PV_Curt, Max_RE_Curt, Prct_PV_curt, Prct_RE_curt, Prct_hr_PV_curt, Prct_hr_RE_curt, PV_Curt_Cap_factor, RE_Curt_Cap_factor, Total_Gen_Cost, ], index=[ "% PV Penetration", "% RE Penetration", "% VRE Penetration", "Max PV Curtailment [MW]", "Max RE Curtailment [MW]", "% PV Curtailment", "% RE Curtailment", "% PV hrs Curtailed", "% RE hrs Curtailed", "PV Curtailment Capacity Factor", "RE Curtailment Capacity Factor", "Gen Cost", ], ) vg_out = vg_out.rename(scenario) Penetration_Curtailment_out = pd.concat( [Penetration_Curtailment_out, vg_out], axis=1, sort=False ) Penetration_Curtailment_out = Penetration_Curtailment_out.T # Data table of values to return to main program Data_Table_Out = Penetration_Curtailment_out VG_index = pd.Series(Penetration_Curtailment_out.index) # VG_index = VG_index.str.split(n=1, pat="_", expand=True) # VG_index.rename(columns = {0:"Scenario"}, inplace=True) VG_index.rename("Scenario", inplace=True) # VG_index = VG_index["Scenario"] Penetration_Curtailment_out.loc[:, "Scenario"] = VG_index[ :, ].values marker_dict = dict(zip(VG_index.unique(), self.marker_style)) colour_dict = dict(zip(VG_index.unique(), self.color_list)) Penetration_Curtailment_out["color"] = [ colour_dict.get(x, "#333333") for x in Penetration_Curtailment_out.Scenario ] Penetration_Curtailment_out["marker"] = [ marker_dict.get(x, ".") for x in Penetration_Curtailment_out.Scenario ] if Penetration_Curtailment_out.empty: logger.warning(f"No Generation in {zone_input}") out = MissingZoneData() outputs[zone_input] = out continue mplt = SetupSubplot() fig, ax = mplt.get_figure() for _, row in Penetration_Curtailment_out.iterrows(): if prop == "PV": ax.scatter( row["% PV Penetration"], row["% PV Curtailment"], marker=row["marker"], c=row["color"], s=100, label=row["Scenario"], ) ax.set_ylabel( "% PV Curtailment", color="black", rotation="vertical" ) ax.set_xlabel( "% PV Penetration", color="black", rotation="horizontal" ) elif prop == "PV+Wind": ax.scatter( row["% RE Penetration"], row["% RE Curtailment"], marker=row["marker"], c=row["color"], s=40, label=row["Scenario"], ) ax.set_ylabel( "% PV + Wind Curtailment", color="black", rotation="vertical" ) ax.set_xlabel( "% PV + Wind Penetration", color="black", rotation="horizontal" ) ax.set_ylim(bottom=0) ax.margins(x=0.01) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) # Add legend mplt.add_legend() outputs[zone_input] = {"fig": fig, "data_table": Data_Table_Out} return outputs
[docs] def curt_total( self, start_date_range: str = None, end_date_range: str = None, scenario_groupby: str = "Scenario", **_, ): """Creates stacked barplots of total curtailment by technology. A separate bar is created for each scenario. Args: start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. scenario_groupby (str, optional): Specifies whether to group data by Scenario or Year-Sceanrio. If grouping by Year-Sceanrio the year will be identified from the timestamp and appeneded to the sceanrio name. This is useful when plotting data which covers multiple years such as ReEDS. Defaults to Scenario. .. versionadded:: 0.10.0 Returns: dict: dictionary containing the created plot and its data table. """ outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [ (True, f"generator_{curtailment_prop}", self.Scenarios), (False, "generator_Available_Capacity", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() for zone_input in self.Zones: logger.info(f"{self.AGG_BY} = {zone_input}") vre_curt_chunks = [] avail_gen_chunks = [] for scenario in self.Scenarios: logger.info(f"Scenario = {scenario}") vre_curt = self[f"generator_{curtailment_prop}"].get(scenario) try: vre_curt = vre_curt.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info(f"No curtailment in {zone_input}") continue vre_curt = self.df_process_gen_inputs(vre_curt) # If using Marmot's curtailment property if curtailment_prop == "Curtailment": vre_curt = self.assign_curtailment_techs( vre_curt, self.gen_categories.vre ) avail_gen = self["generator_Available_Capacity"].get(scenario) if avail_gen.empty: avail_gen = self[f"generator_{curtailment_prop}"][scenario].copy() avail_gen.iloc[:, 0] = 0 try: # Check for regions missing all generation. avail_gen = avail_gen.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info(f"No available generation in {zone_input}") continue avail_gen = self.df_process_gen_inputs(avail_gen) # If using Marmot's curtailment property if curtailment_prop == "Curtailment": avail_gen = self.assign_curtailment_techs( avail_gen, self.gen_categories.vre ) if pd.notna(start_date_range): vre_curt, avail_gen = set_timestamp_date_range( [vre_curt, avail_gen], start_date_range, end_date_range ) if vre_curt.empty: logger.warning("No curtailment in selected Date Range") continue # Calculates interval step to correct for MWh interval_count = get_sub_hour_interval_count(vre_curt) vre_curt = vre_curt / interval_count avail_gen = avail_gen / interval_count vre_table = self.year_scenario_grouper( vre_curt, scenario, groupby=scenario_groupby ).sum() avail_gen_table = self.year_scenario_grouper( avail_gen, scenario, groupby=scenario_groupby ).sum() vre_curt_chunks.append(vre_table) avail_gen_chunks.append(avail_gen_table) if not vre_curt_chunks: outputs[zone_input] = MissingZoneData() continue Total_Curtailment_out = pd.concat(vre_curt_chunks, axis=0, sort=False) Total_Available_gen = pd.concat(avail_gen_chunks, axis=0, sort=False) # if Total_Available_gen not included and all 0, # vre_pct_curt set to empty DataFrame if Total_Available_gen.to_numpy().sum() == 0: vre_pct_curt = pd.DataFrame() else: vre_pct_curt = Total_Curtailment_out.sum( axis=1 ) / Total_Available_gen.sum(axis=1) if Total_Curtailment_out.empty: outputs[zone_input] = MissingZoneData() continue # unit conversion return divisor and energy units unitconversion = self.capacity_energy_unitconversion( Total_Curtailment_out, self.Scenarios, sum_values=True ) Total_Curtailment_out = Total_Curtailment_out / unitconversion["divisor"] # Data table of values to return to main program Data_Table_Out = Total_Curtailment_out Data_Table_Out = Data_Table_Out.add_suffix(f" ({unitconversion['units']}h)") mplt = PlotLibrary() fig, ax = mplt.get_figure() # Set x-tick labels if self.custom_xticklabels: tick_labels = self.custom_xticklabels else: tick_labels = Total_Curtailment_out.index mplt.barplot( Total_Curtailment_out, color=self.marmot_color_dict, stacked=True, custom_tick_labels=tick_labels, ) ax.set_ylabel( f"Total Curtailment ({unitconversion['units']}h)", color="black", rotation="vertical", ) ax.margins(x=0.01) # Add legend mplt.add_legend(reverse_legend=True) # Add title if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) curt_totals = Total_Curtailment_out.sum(axis=1) # inserts total bar value above each bar for k, patch in enumerate(ax.patches): height = curt_totals[k] width = patch.get_width() x, y = patch.get_xy() if not vre_pct_curt.empty: ax.text( x + width / 2, y + height + 0.05 * max(ax.get_ylim()), "{:.2%}\n|{:,.2f}|".format(vre_pct_curt[k], curt_totals[k]), horizontalalignment="center", verticalalignment="center", fontsize=7, color="red", ) else: ax.text( x + width / 2, y + height + 0.05 * max(ax.get_ylim()), "|{:,.2f}|".format(curt_totals[k]), horizontalalignment="center", verticalalignment="center", fontsize=7, color="red", ) if k >= len(curt_totals) - 1: break outputs[zone_input] = {"fig": fig, "data_table": Data_Table_Out} return outputs
[docs] def curt_total_diff( self, start_date_range: str = None, end_date_range: str = None, **_ ): """Creates stacked barplots of total curtailment by technology relative to a base scenario. Barplots show the change in total curtailment relative to a base scenario. The default is to comapre against the first scenario provided in the inputs list. Args: start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table. """ return UnderDevelopment() outputs: dict = {} properties = [ (True, f"generator_{curtailment_prop}", self.Scenarios), (True, "generator_Available_Capacity", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) # Checks if all data required by plot is available, if 1 in list required data is missing if 1 in check_input_data: outputs = MissingInputData() return outputs for zone_input in self.Zones: logger.info(self.AGG_BY + " = " + zone_input) Total_Curtailment_out = pd.DataFrame() Total_Available_gen = pd.DataFrame() vre_curt_chunks = [] avail_gen_chunks = [] for scenario in self.Scenarios: logger.info("Scenario = " + scenario) # Adjust list of values to drop from vre_gen_cat depending on if it exists in processed techs # self.gen_categories.vre = [name for name in self.gen_categories.vre if name in curtailment_collection.get(scenario).index.unique(level="tech")] vre_collection = {} avail_vre_collection = {} vre_curt = self[f"generator_{curtailment_prop}"].get(scenario) try: vre_curt = vre_curt.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info("No curtailment in " + zone_input) continue vre_curt = self.df_process_gen_inputs(vre_curt) # If using Marmot's curtailment property if curtailment_prop == "Curtailment": vre_curt = self.assign_curtailment_techs( vre_curt, self.gen_categories.vre ) avail_gen = self["generator_Available_Capacity"].get(scenario) try: # Check for regions missing all generation. avail_gen = avail_gen.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info("No available generation in " + zone_input) continue avail_gen = self.df_process_gen_inputs(avail_gen) # If using Marmot's curtailment property if curtailment_prop == "Curtailment": avail_gen = self.assign_curtailment_techs( avail_gen, self.gen_categories.vre ) for vre_type in self.gen_categories.vre: try: vre_curt_type = vre_curt[vre_type] except KeyError: logger.info("No " + vre_type + " in " + zone_input) continue vre_collection[vre_type] = float(vre_curt_type.sum()) avail_gen_type = avail_gen[vre_type] avail_vre_collection[vre_type] = float(avail_gen_type.sum()) vre_table = pd.DataFrame(vre_collection, index=[scenario]) avail_gen_table = pd.DataFrame(avail_vre_collection, index=[scenario]) vre_curt_chunks.append(vre_table) avail_gen_chunks.append(avail_gen_table) Total_Curtailment_out = pd.concat(vre_curt_chunks, axis=0, sort=False) Total_Available_gen = pd.concat(avail_gen_chunks, axis=0, sort=False) vre_pct_curt = Total_Curtailment_out.sum(axis=1) / Total_Available_gen.sum( axis=1 ) # Change to a diff on the first scenario. Total_Curtailment_out = Total_Curtailment_out - Total_Curtailment_out.xs( self.Scenarios[0] ) Total_Curtailment_out.drop( self.Scenarios[0], inplace=True ) # Drop base entry Total_Curtailment_out.index = Total_Curtailment_out.index.str.replace( "_", " " ) # Data table of values to return to main program Data_Table_Out = Total_Curtailment_out if Total_Curtailment_out.empty == True: outputs[zone_input] = MissingZoneData() continue # unit conversion return divisor and energy units unitconversion = self.capacity_energy_unitconversion( Total_Curtailment_out, self.Scenarios, sum_values=True ) Total_Curtailment_out = Total_Curtailment_out / unitconversion["divisor"] mplt = PlotLibrary() fig, ax = mplt.get_figure() Total_Curtailment_out.plot.bar( stacked=True, color=[ self.marmot_color_dict.get(x, "#333333") for x in Total_Curtailment_out.columns ], ax=ax, ) ax.set_ylabel( "Total Curtailment ({}h)".format(unitconversion["units"]), color="black", rotation="vertical", ) # Set x-tick labels if self.custom_xticklabels: tick_labels = self.custom_xticklabels else: tick_labels = Total_Curtailment_out.index mplt.set_barplot_xticklabels(tick_labels) ax.margins(x=0.01) if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) handles, labels = ax.get_legend_handles_labels() ax.legend( reversed(handles), reversed(labels), loc="lower left", bbox_to_anchor=(1, 0), facecolor="inherit", frameon=True, ) curt_totals = Total_Curtailment_out.sum(axis=1) # inserts total bar value above each bar k = 0 for i in ax.patches: height = curt_totals[k] width = i.get_width() x, y = i.get_xy() ax.text( x + width / 2, y + height + 0.05 * max(ax.get_ylim()), "{:.2%}\n|{:,.2f}|".format(vre_pct_curt[k], curt_totals[k]), horizontalalignment="center", verticalalignment="center", fontsize=11, color="red", ) k += 1 if k >= len(vre_pct_curt): break outputs[zone_input] = {"fig": fig, "data_table": Data_Table_Out} return outputs
[docs] def curt_ind( self, figure_name: str = None, prop: str = None, start_date_range: str = None, end_date_range: str = None, **_, ): """Curtailment as a percentage of total generation, of individual generators. The generators are specified as a comma seperated string in the fourth column of Marmot_plot_select.csv and is passed to the prop argument. The method outputs two.csv files: - one that contains curtailment, in percent, for each scenario and site. - the other contains total generation, in TWh, for each scenario and site. This method does not return data to MarmotPlot, data is saved within the method directly to the output folder. Args: figure_name (str, optional): User defined figure output name. Defaults to None. prop (str, optional): comma seperated string of generators to display. Defaults to None. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. Returns: dict: dictionary containing the created plot and its data table. """ outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [ (True, "generator_Generation", self.Scenarios), (True, f"generator_{curtailment_prop}", self.Scenarios), ] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() Total_Curtailment_Out_perc = pd.DataFrame() Total_Curt = pd.DataFrame() Total_Gen = pd.DataFrame() scen_idx = -1 chunks = [] for scenario in self.Scenarios: scen_idx += 1 logger.info(f"Scenario = {scenario}") vre_curt: pd.DataFrame = self[f"generator_{curtailment_prop}"].get(scenario) gen: pd.DataFrame = self["generator_Generation"].get(scenario) # Select only lines specified in Marmot_plot_select.csv. select_sites = prop.split(",") select_sites = [ site[1:] if site[0] == " " else site for site in select_sites ] logger.info( "Plotting curtailment only for sites specified in Marmot_plot_select.csv" ) site_idx = -1 sites_chunk = [] sites_gen_chunk = [] curt_tots_chunk = [] chunks_scen = [] ti = gen.index.get_level_values("timestamp").unique() for site in select_sites: if site in gen.index.get_level_values("gen_name").unique(): site_idx += 1 gen_site = gen.xs(site, level="gen_name") curt = vre_curt.xs(site, level="gen_name") if pd.notna(start_date_range): gen_site, curt = set_timestamp_date_range( [gen_site, curt], start_date_range, end_date_range ) if curt.empty: logger.warning("No curtailment in selected Date Range") continue curt_tot = curt.sum() gen_tot = gen_site.sum() curt_perc = pd.Series(curt_tot / gen_tot) levels2drop = [ level for level in gen_site.index.names if level != "timestamp" ] gen_site = gen_site.droplevel(levels2drop) else: curt_perc = pd.Series([0]) curt_tot = pd.Series([0]) gen_tot = pd.Series([0]) curt = pd.Series([0] * len(ti), name=site, index=ti) gen_tot.columns = [site] curt_perc.columns = [site] curt_tot.columns = [site] curt.columns = [site] sites_gen_chunk.append(gen_tot) sites_chunk.append(curt_perc) curt_tots_chunk.append(curt_tot) chunks_scen.append(curt) if not chunks_scen: outputs = MissingInputData() continue curt_8760_scen = pd.concat(chunks_scen, axis=1) scen_name = pd.Series([scenario] * len(curt_8760_scen), name="Scenario") curt_8760_scen = curt_8760_scen.set_index([scen_name], append=True) chunks.append(curt_8760_scen) sites_gen = pd.concat(sites_gen_chunk) sites = pd.concat(sites_chunk) curt_tots = pd.concat(curt_tots_chunk) sites.name = scenario sites.index = select_sites curt_tots.name = scenario curt_tots.index = select_sites sites_gen.name = scenario sites_gen.index = select_sites Total_Curtailment_Out_perc = pd.concat( [Total_Curtailment_Out_perc, sites], axis=1 ) Total_Gen = pd.concat([Total_Gen, sites_gen], axis=1) Total_Curt = pd.concat([Total_Curt, curt_tots], axis=1) if not chunks: return MissingInputData() Curt_8760 = pd.concat(chunks, axis=0, copy=False) Curt_8760.to_csv( self.figure_folder.joinpath( self.AGG_BY + "_curtailment", figure_name + "_8760.csv" ) ) Total_Gen = Total_Gen / 1000000 Total_Curtailment_Out_perc.T.to_csv( self.figure_folder.joinpath( self.AGG_BY + "_curtailment", figure_name + ".csv" ) ) Total_Gen.T.to_csv( self.figure_folder.joinpath( self.AGG_BY + "_curtailment", figure_name + "_gen.csv" ) ) mplt = PlotLibrary(figsize=(9, 6)) fig, ax = mplt.get_figure() mplt.barplot(Total_Curtailment_Out_perc, color=self.color_list) mplt.add_legend() ax.set_ylabel("Curtailment (%)", color="black", rotation="vertical") unitconversion = self.capacity_energy_unitconversion(Total_Curt, self.Scenarios) Total_Curt = Total_Curt / unitconversion["divisor"] Total_Curt = round(Total_Curt, 2) Total_Curt = Total_Curt.melt() # inserts total bar value above each bar, # but only if it is the max in the bar group. # to do this, take the n highest patches, where n is the number of bar broups (select_sites) heights = [patch.get_height() for patch in ax.patches] heights.sort(reverse=True) toph = heights[0 : len(select_sites)] for k, patch in enumerate(ax.patches): height = patch.get_height() if height in toph: width = patch.get_width() x, y = patch.get_xy() ax.text( x + width / 2, y + height + 0.05 * max(ax.get_ylim()), str(format(Total_Curt.iloc[k][1], ".2f")) + f" {unitconversion['units']}h", horizontalalignment="center", verticalalignment="center", fontsize=11, ) fig.savefig( self.figure_folder.joinpath( self.AGG_BY + "_curtailment", figure_name + ".svg" ), dpi=600, bbox_inches="tight", ) outputs = DataSavedInModule() return outputs
[docs] def average_diurnal_curt( self, timezone: str = None, start_date_range: str = None, end_date_range: str = None, scenario_groupby: str = "Scenario", **_, ): """Average diurnal renewable curtailment plot. Each scenario is plotted as a separate line and shows the average hourly curtailment over a 24 hour period averaged across the entire year or time period defined. Args: timezone (str, optional): The timezone to display on the x-axes. Defaults to None. start_date_range (str, optional): Defines a start date at which to represent data from. Defaults to None. end_date_range (str, optional): Defines a end date at which to represent data to. Defaults to None. scenario_groupby (str, optional): Specifies whether to group data by Scenario or Year-Sceanrio. If grouping by Year-Sceanrio the year will be identified from the timestamp and appeneded to the sceanrio name. This is useful when plotting data which covers multiple years such as ReEDS. Defaults to Scenario. .. versionadded:: 0.10.0 Returns: dict: dictionary containing the created plot and its data table. """ outputs: dict = {} # List of properties needed by the plot, properties are a set of tuples and # contain 3 parts: required True/False, property name and scenarios required, # scenarios must be a list. properties = [(True, f"generator_{curtailment_prop}", self.Scenarios)] # Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary # with all required properties, returns a 1 if required data is missing check_input_data = self.get_formatted_data(properties) if 1 in check_input_data: return MissingInputData() for zone_input in self.Zones: logger.info(f"{self.AGG_BY} = {zone_input}") chunks = [] for scenario in self.Scenarios: logger.info(f"Scenario = {scenario}") re_curt = self[f"generator_{curtailment_prop}"].get(scenario) try: re_curt = re_curt.xs(zone_input, level=self.AGG_BY) except KeyError: logger.info(f"No curtailment in {zone_input}") continue re_curt = self.df_process_gen_inputs(re_curt) # If using Marmot's curtailment property if curtailment_prop == "Curtailment": re_curt = self.assign_curtailment_techs( re_curt, self.gen_categories.vre ) # Sum across technologies re_curt = re_curt.sum(axis=1) if pd.notna(start_date_range): re_curt = set_timestamp_date_range( re_curt, start_date_range, end_date_range ) if re_curt.empty: logger.warning("No curtailment in selected Date Range") continue interval_count = get_sub_hour_interval_count(re_curt) re_curt = re_curt / interval_count # Group data by hours and find mean across entire range re_curt = self.year_scenario_grouper( re_curt, scenario, groupby=scenario_groupby, additional_groups=[re_curt.index.hour], ).mean() for scen in re_curt.index.get_level_values("Scenario").unique(): re_curt_scen = re_curt.xs(scen, level="Scenario") # If hours are missing, fill with 0 if len(re_curt_scen) < 24: re_idx = range(0, 24) re_curt_scen = re_curt_scen.reindex(re_idx, fill_value=0) # reset index to datetime re_curt_scen.index = pd.date_range( "2024-01-01", periods=24, freq="H" ) re_curt_scen.rename(scen, inplace=True) chunks.append(re_curt_scen) # No curtailment data in zone if not chunks: outputs[zone_input] = MissingZoneData() continue RE_Curtailment_DC = pd.concat(chunks, axis=1, sort=False) # Create Dictionary from scenario names and color list colour_dict = dict(zip(RE_Curtailment_DC.columns, self.color_list)) mplt = SetupSubplot() fig, ax = mplt.get_figure() unitconversion = self.capacity_energy_unitconversion( RE_Curtailment_DC, self.Scenarios ) RE_Curtailment_DC = RE_Curtailment_DC / unitconversion["divisor"] Data_Table_Out = RE_Curtailment_DC.copy() Data_Table_Out.index = pd.date_range( "2024-01-01", periods=24, freq="H" ).time Data_Table_Out = Data_Table_Out.add_suffix(f" ({unitconversion['units']})") for column in RE_Curtailment_DC: ax.plot( RE_Curtailment_DC[column], linewidth=2, color=colour_dict[column], label=column, ) mplt.set_yaxis_major_tick_format() # Add legend mplt.add_legend() # Set time ticks mplt.set_subplot_timeseries_format(zero_formats_3="%H:%M") ax.set_ylabel( f"Average Diurnal Curtailment ({unitconversion['units']})", color="black", rotation="vertical", ) ax.margins(x=0.01) ax.set_ylim(bottom=0) # Add title if plot_data_settings["plot_title_as_region"]: mplt.add_main_title(zone_input) outputs[zone_input] = {"fig": fig, "data_table": Data_Table_Out} return outputs