# -*- coding: utf-8 -*-
"""Locational price analysis plots.
Price analysis plots, price duration curves and timeseries plots.
Prices plotted in $/MWh
@author: adyreson and Daniel Levie
"""
import logging
from pathlib import Path
from typing import List
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import marmot.utils.mconfig as mconfig
from marmot.plottingmodules.plotutils.plot_data_helper import (
PlotDataStoreAndProcessor,
set_facet_col_row_dimensions,
set_x_y_dimension,
)
from marmot.plottingmodules.plotutils.plot_exceptions import (
DataSavedInModule,
InputSheetError,
MissingInputData,
)
from marmot.plottingmodules.plotutils.plot_library import SetupSubplot
from marmot.plottingmodules.plotutils.styles import ColorList
from marmot.plottingmodules.plotutils.timeseries_modifiers import (
set_timestamp_date_range,
)
logger = logging.getLogger("plotter." + __name__)
plot_data_settings: dict = mconfig.parser("plot_data")
[docs]class Prices(PlotDataStoreAndProcessor):
"""Locational price analysis plots.
The price.py module contains methods that are
related to grid prices at regions, zones, nodes etc.
Prices inherits from the PlotDataStoreAndProcessor class to assist
in creating figures.
"""
def __init__(
self,
Zones: List[str],
Scenarios: List[str],
AGG_BY: str,
ordered_gen: List[str],
marmot_solutions_folder: Path,
color_list: list = ColorList().colors,
ylabels: List[str] = None,
xlabels: List[str] = None,
**kwargs,
):
"""
Args:
Zones (List[str]): List of regions/zones to plot.
Scenarios (List[str]): List of scenarios to plot.
AGG_BY (str): Informs region type to aggregate by when creating plots.
ordered_gen (List[str]): Ordered list of generator technologies to plot,
order defines the generator technology position in stacked bar and area plots.
marmot_solutions_folder (Path): Directory containing Marmot solution outputs.
color_list (list, optional): List of colors to apply to non-gen plots.
Defaults to ColorList().colors.
ylabels (List[str], optional): y-axis labels for facet plots.
Defaults to None.
xlabels (List[str], optional): x-axis labels for facet plots.
Defaults to None.
"""
# Instantiation of PlotDataStoreAndProcessor
super().__init__(AGG_BY, ordered_gen, marmot_solutions_folder, **kwargs)
self.Zones = Zones
self.Scenarios = Scenarios
self.color_list = color_list
self.ylabels = ylabels
self.xlabels = xlabels
[docs] def pdc_all_regions(
self,
y_axis_max: float = None,
start_date_range: str = None,
end_date_range: str = None,
**_,
):
"""Creates a price duration curve for all regions/zones and plots them on a single facet plot.
Price is in $/MWh.
The code automatically creates a facet plot based on the number of regions/zones in the input.
All scenarios are plotted on a single facet for each region/zone
Args:
y_axis_max (float, optional): Max y-axis value.
Defaults to None.
start_date_range (str, optional): Defines a start date at which to represent data from.
Defaults to None.
end_date_range (str, optional): Defines a end date at which to represent data to.
Defaults to None.
Returns:
dict: dictionary containing the created plot and its data table
"""
if self.AGG_BY == "zone":
agg = "zone"
else:
agg = "region"
outputs: dict = {}
# List of properties needed by the plot, properties are a set of tuples and
# contain 3 parts: required True/False, property name and scenarios required,
# scenarios must be a list.
properties = [(True, f"{agg}_Price", self.Scenarios)]
# Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary
# with all required properties, returns a 1 if required data is missing
check_input_data = self.get_formatted_data(properties)
if 1 in check_input_data:
return MissingInputData()
# Location to save to
save_figures: Path = self.figure_folder.joinpath(f"{self.AGG_BY}_prices")
region_number = len(self.Zones)
# determine x,y length for plot
ncols, nrows = set_x_y_dimension(region_number)
grid_size = ncols * nrows
# Used to calculate any excess axis to delete
excess_axs = grid_size - region_number
# setup plot
mplt = SetupSubplot(nrows, ncols, sharey=True, squeeze=False, ravel_axs=True)
fig, axs = mplt.get_figure()
plt.subplots_adjust(wspace=0.1, hspace=0.50)
data_table = []
for n, zone_input in enumerate(self.Zones):
all_prices = []
for scenario in self.Scenarios:
price = self._process_data(self[f"{agg}_Price"], scenario, zone_input)
price = price.groupby(["timestamp"]).sum()
if pd.notna(start_date_range):
price = set_timestamp_date_range(
price, start_date_range, end_date_range
)
price.sort_values(by=scenario, ascending=False, inplace=True)
price.reset_index(drop=True, inplace=True)
all_prices.append(price)
duration_curve = pd.concat(all_prices, axis=1)
duration_curve.columns = duration_curve.columns.str.replace("_", " ")
data_out = duration_curve.copy()
data_out.columns = [zone_input + "_" + str(col) for col in data_out.columns]
data_table.append(data_out)
color_dict = dict(zip(duration_curve.columns, self.color_list))
for column in duration_curve:
axs[n].plot(
duration_curve[column],
linewidth=1,
color=color_dict[column],
label=column,
alpha=1,
)
if pd.notna(y_axis_max):
axs[n].set_ylim(bottom=0, top=float(y_axis_max))
axs[n].set_xlim(0, len(duration_curve))
axs[n].set_title(zone_input.replace("_", " "))
mplt.add_legend()
# Remove extra axes
mplt.remove_excess_axs(excess_axs, grid_size)
plt.ylabel(
f"{self.AGG_BY} Price ($/MWh)",
color="black",
rotation="vertical",
labelpad=30,
)
plt.xlabel("Intervals", color="black", rotation="horizontal", labelpad=20)
Data_Table_Out = pd.concat(data_table, axis=1)
Data_Table_Out = Data_Table_Out.add_suffix(" ($/MWh)")
fig.savefig(
save_figures.joinpath("Price_Duration_Curve_All_Regions.svg"),
dpi=600,
bbox_inches="tight",
)
Data_Table_Out.to_csv(
save_figures.joinpath("Price_Duration_Curve_All_Regions.csv")
)
outputs = DataSavedInModule()
return outputs
[docs] def region_pdc(
self,
figure_name: str = None,
y_axis_max: float = None,
start_date_range: str = None,
end_date_range: str = None,
**_,
):
"""Creates a price duration curve for each region. Price in $/MWh
The code will create either a facet plot or a single plot depending on
if the Facet argument is active.
If a facet plot is created, each scenario is plotted on a separate facet,
otherwise all scenarios are plotted on a single plot.
To make a facet plot, ensure the word 'Facet' is found in the figure_name.
Args:
figure_name (str, optional): User defined figure output name.
Defaults to None.
y_axis_max (float, optional): Max y-axis value.
Defaults to None.
start_date_range (str, optional): Defines a start date at which to represent data from.
Defaults to None.
end_date_range (str, optional): Defines a end date at which to represent data to.
Defaults to None.
Returns:
dict: dictionary containing the created plot and its data table
"""
outputs: dict = {}
facet = False
if "Facet" in figure_name:
facet = True
if self.AGG_BY == "zone":
agg = "zone"
else:
agg = "region"
# List of properties needed by the plot, properties are a set of tuples and
# contain 3 parts: required True/False, property name and scenarios required,
# scenarios must be a list.
properties = [(True, f"{agg}_Price", self.Scenarios)]
# Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary
# with all required properties, returns a 1 if required data is missing
check_input_data = self.get_formatted_data(properties)
if 1 in check_input_data:
return MissingInputData()
for zone_input in self.Zones:
logger.info(f"{self.AGG_BY} = {zone_input}")
all_prices = []
for scenario in self.Scenarios:
price = self._process_data(self[f"{agg}_Price"], scenario, zone_input)
price = price.groupby(["timestamp"]).sum()
if pd.notna(start_date_range):
price = set_timestamp_date_range(
price, start_date_range, end_date_range
)
price.sort_values(by=scenario, ascending=False, inplace=True)
price.reset_index(drop=True, inplace=True)
all_prices.append(price)
duration_curve = pd.concat(all_prices, axis=1)
duration_curve.columns = duration_curve.columns.str.replace("_", " ")
Data_Out = duration_curve.add_suffix(" ($/MWh)")
if not self.xlabels:
ncols = 1
else:
ncols = len(self.xlabels)
if not self.ylabels:
nrows = 1
else:
nrows = len(self.ylabels)
# If the plot is not a facet plot, grid size should be 1x1
if not facet:
ncols = 1
nrows = 1
color_dict = dict(zip(duration_curve.columns, self.color_list))
# setup plot
mplt = SetupSubplot(
nrows, ncols, sharey=True, squeeze=False, ravel_axs=True
)
fig, axs = mplt.get_figure()
plt.subplots_adjust(wspace=0.05, hspace=0.2)
n = 0
for column in duration_curve:
axs[n].plot(
duration_curve[column],
linewidth=1,
color=color_dict[column],
label=column,
alpha=1,
)
if pd.notna(y_axis_max):
axs[n].set_ylim(bottom=0, top=float(y_axis_max))
axs[n].set_xlim(0, len(duration_curve))
if facet:
n += 1
mplt.add_legend()
plt.ylabel(
f"{self.AGG_BY} Price ($/MWh)",
color="black",
rotation="vertical",
labelpad=20,
)
plt.xlabel("Intervals", color="black", rotation="horizontal", labelpad=20)
if plot_data_settings["plot_title_as_region"]:
mplt.add_main_title(zone_input)
outputs[zone_input] = {"fig": fig, "data_table": Data_Out}
return outputs
[docs] def region_timeseries_price(
self,
figure_name: str = None,
y_axis_max: float = None,
timezone: str = "",
start_date_range: str = None,
end_date_range: str = None,
**_,
):
"""Creates price timeseries line plot for each region. Price is $/MWh.
The code will create either a facet plot or a single plot depending on
if the Facet argument is active.
If a facet plot is created, each scenario is plotted on a separate facet,
otherwise all scenarios are plotted on a single plot.
To make a facet plot, ensure the work 'Facet' is found in the figure_name.
Args:
figure_name (str, optional): User defined figure output name.
Defaults to None.
y_axis_max (float, optional): Max y-axis value.
Defaults to None.
timezone (str, optional): The timezone to display on the x-axes.
Defaults to "".
start_date_range (str, optional): Defines a start date at which to represent data from.
Defaults to None.
end_date_range (str, optional): Defines a end date at which to represent data to.
Defaults to None.
Returns:
dict: dictionary containing the created plot and its data table
"""
outputs: dict = {}
facet = False
if "Facet" in figure_name:
facet = True
if self.AGG_BY == "zone":
agg = "zone"
else:
agg = "region"
# List of properties needed by the plot, properties are a set of tuples and
# contain 3 parts: required True/False, property name and scenarios required,
# scenarios must be a list.
properties = [(True, f"{agg}_Price", self.Scenarios)]
# Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary
# with all required properties, returns a 1 if required data is missing
check_input_data = self.get_formatted_data(properties)
if 1 in check_input_data:
return MissingInputData()
for zone_input in self.Zones:
logger.info(f"{self.AGG_BY} = {zone_input}")
all_prices = []
for scenario in self.Scenarios:
price = self._process_data(self[f"{agg}_Price"], scenario, zone_input)
price = price.groupby(["timestamp"]).sum()
if pd.notna(start_date_range):
price = set_timestamp_date_range(
price, start_date_range, end_date_range
)
all_prices.append(price)
timeseries = pd.concat(all_prices, axis=1)
timeseries.columns = timeseries.columns.str.replace("_", " ")
Data_Out = timeseries.add_suffix(" ($/MWh)")
if not self.xlabels:
ncols = 1
else:
ncols = len(self.xlabels)
if not self.ylabels:
nrows = 1
else:
nrows = len(self.ylabels)
# If the plot is not a facet plot, grid size should be 1x1
if not facet:
ncols = 1
nrows = 1
color_dict = dict(zip(timeseries.columns, self.color_list))
# setup plot
mplt = SetupSubplot(
nrows, ncols, sharey=True, squeeze=False, ravel_axs=True
)
fig, axs = mplt.get_figure()
plt.subplots_adjust(wspace=0.05, hspace=0.2)
n = 0 # Counter for scenario subplots
for column in timeseries:
axs[n].plot(
timeseries[column],
linewidth=1,
color=color_dict[column],
label=column,
alpha=1,
)
if pd.notna(y_axis_max):
axs[n].set_ylim(bottom=0, top=float(y_axis_max))
mplt.set_subplot_timeseries_format(sub_pos=n)
if facet:
n += 1
# Add legend
mplt.add_legend()
# Add title
if plot_data_settings["plot_title_as_region"]:
mplt.add_main_title(zone_input)
plt.ylabel(
f"{self.AGG_BY} Price ($/MWh)",
color="black",
rotation="vertical",
labelpad=20,
)
plt.xlabel(timezone, color="black", rotation="horizontal", labelpad=20)
outputs[zone_input] = {"fig": fig, "data_table": Data_Out}
return outputs
[docs] def timeseries_price_all_regions(
self,
y_axis_max: float = None,
timezone: str = "",
start_date_range: str = None,
end_date_range: str = None,
**_,
):
"""Creates a price timeseries plot for all regions/zones and plots them on a single facet plot.
Price in $/MWh.
The code automatically creates a facet plot based on the number of regions/zones in the input.
All scenarios are plotted on a single facet for each region/zone.
Args:
y_axis_max (float, optional): Max y-axis value.
Defaults to None.
timezone (str, optional): The timezone to display on the x-axes.
Defaults to "".
start_date_range (str, optional): Defines a start date at which to represent data from.
Defaults to None.
end_date_range (str, optional): Defines a end date at which to represent data to.
Defaults to None.
Returns:
dict: dictionary containing the created plot and its data table.
"""
outputs: dict = {}
if self.AGG_BY == "zone":
agg = "zone"
else:
agg = "region"
# List of properties needed by the plot, properties are a set of tuples and
# contain 3 parts: required True/False, property name and scenarios required,
# scenarios must be a list.
properties = [(True, f"{agg}_Price", self.Scenarios)]
# Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary
# with all required properties, returns a 1 if required data is missing
check_input_data = self.get_formatted_data(properties)
if 1 in check_input_data:
return MissingInputData()
# Location to save to
save_figures: Path = self.figure_folder.joinpath(f"{self.AGG_BY}_prices")
region_number = len(self.Zones)
ncols, nrows = set_x_y_dimension(region_number)
grid_size = ncols * nrows
# Used to calculate any excess axis to delete
excess_axs = grid_size - region_number
# setup plot
mplt = SetupSubplot(nrows, ncols, sharey=True, squeeze=False, ravel_axs=True)
fig, axs = mplt.get_figure()
plt.subplots_adjust(wspace=0.1, hspace=0.70)
data_table = []
for n, zone_input in enumerate(self.Zones):
logger.info(f"{self.AGG_BY} = {zone_input}")
all_prices = []
for scenario in self.Scenarios:
price = self._process_data(self[f"{agg}_Price"], scenario, zone_input)
price = price.groupby(["timestamp"]).sum()
if pd.notna(start_date_range):
price = set_timestamp_date_range(
price, start_date_range, end_date_range
)
all_prices.append(price)
timeseries = pd.concat(all_prices, axis=1)
timeseries.columns = timeseries.columns.str.replace("_", " ")
data_out = timeseries.copy()
data_out.columns = [zone_input + "_" + str(col) for col in data_out.columns]
data_table.append(data_out)
color_dict = dict(zip(timeseries.columns, self.color_list))
for column in timeseries:
axs[n].plot(
timeseries[column],
linewidth=1,
color=color_dict[column],
label=column,
alpha=1,
)
axs[n].set_title(zone_input.replace("_", " "))
if pd.notna(y_axis_max):
axs[n].set_ylim(bottom=0, top=float(y_axis_max))
mplt.set_subplot_timeseries_format(sub_pos=n)
# Add legend
mplt.add_legend()
# Remove extra axes
mplt.remove_excess_axs(excess_axs, grid_size)
plt.ylabel(
f"{self.AGG_BY} Price ($/MWh)",
color="black",
rotation="vertical",
labelpad=30,
)
plt.xlabel(timezone, color="black", rotation="horizontal", labelpad=20)
Data_Table_Out = pd.concat(data_table, axis=1)
Data_Table_Out = Data_Table_Out.add_suffix(" ($/MWh)")
fig.savefig(
save_figures.joinpath("Price_Timeseries_All_Regions.svg"),
dpi=600,
bbox_inches="tight",
)
Data_Table_Out.to_csv(save_figures.joinpath("Price_Timeseries_All_Regions.csv"))
outputs = DataSavedInModule()
return outputs
[docs] def node_pdc(self, **kwargs):
"""Creates a price duration curve for a set of specifc nodes.
Price in $/MWh.
The code will create either a facet plot or a single plot depending on
the number of nodes included in plot_select.csv property entry.
Returns:
DataSavedInModule: DataSavedInModule exception.
"""
outputs = self._node_price(PDC=True, **kwargs)
return outputs
[docs] def node_timeseries_price(self, **kwargs):
"""Creates a price timeseries plot for a set of specifc nodes.
Price in $/MWh.
The code will create either a facet plot or a single plot depending on
the number of nodes included in plot_select.csv property entry.
Returns:
DataSavedInModule: DataSavedInModule exception.
"""
outputs = self._node_price(**kwargs)
return outputs
def _node_price(
self,
PDC: bool = False,
figure_name: str = None,
prop: str = None,
y_axis_max: float = None,
timezone: str = "",
start_date_range: str = None,
end_date_range: str = None,
**_,
):
"""Creates a price duration curve or timeseries plot for a set of specifc nodes.
This method is called from either node_pdc() or node_timeseries_price()
If PDC == True, a price duration curve plot will be created
The code will create either a facet plot or a single plot depending on
the number of nodes included in plot_select.csv property entry.
Plots and Data are saved within the module
Args:
PDC (bool, optional): If True creates a price duration curve.
Defaults to False.
figure_name (str, optional): User defined figure output name.
Defaults to None.
prop (str, optional): comma seperated string of nodes to display.
Defaults to None.
y_axis_max (float, optional): Max y-axis value.
Defaults to None.
timezone (str, optional): The timezone to display on the x-axes.
Defaults to "".
start_date_range (str, optional): Defines a start date at which to represent data from.
Defaults to None.
end_date_range (str, optional): Defines a end date at which to represent data to.
Defaults to None.
Returns:
DataSavedInModule: DataSavedInModule exception.
"""
# List of properties needed by the plot, properties are a set of tuples and
# contain 3 parts: required True/False, property name and scenarios required,
# scenarios must be a list.
properties = [(True, "node_Price", self.Scenarios)]
# Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary
# with all required properties, returns a 1 if required data is missing
check_input_data = self.get_formatted_data(properties)
if 1 in check_input_data:
return MissingInputData()
node_figure_folder: Path = self.figure_folder.joinpath("node_prices")
node_figure_folder.mkdir(exist_ok=True)
# Select only node specified in Marmot_plot_select.csv.
select_nodes = prop.split(",")
if select_nodes == None:
return InputSheetError()
logger.info(f"Plotting Prices for {select_nodes}")
all_prices = []
for scenario in self.Scenarios:
logger.info(f"Scenario = {scenario}")
price: pd.DataFrame = self["node_Price"][scenario]
price = price.loc[(slice(None), select_nodes), :]
price = price.groupby(["timestamp", "node"]).sum()
price.rename(columns={"values": scenario}, inplace=True)
if pd.notna(start_date_range):
price = set_timestamp_date_range(
price, start_date_range, end_date_range
)
if PDC:
price.sort_values(by=["node", scenario], ascending=False, inplace=True)
price.reset_index("timestamp", drop=True, inplace=True)
all_prices.append(price)
pdc = pd.concat(all_prices, axis=1)
pdc.columns = pdc.columns.str.replace("_", " ")
Data_Out = pdc.add_suffix(" ($/MWh)")
ncols, nrows = set_x_y_dimension(len(select_nodes))
# setup plot
mplt = SetupSubplot(nrows, ncols, sharey=True, squeeze=False, ravel_axs=True)
fig, axs = mplt.get_figure()
plt.subplots_adjust(wspace=0.1, hspace=0.70)
color_dict = dict(zip(pdc.columns, self.color_list))
for n, node in enumerate(select_nodes):
if PDC:
try:
node_pdc = pdc.xs(node)
node_pdc.reset_index(drop=True, inplace=True)
except KeyError:
logger.info(f"{node} not found")
continue
else:
try:
node_pdc = pdc.xs(node, level="node")
except KeyError:
logger.info(f"{node} not found")
continue
for column in node_pdc:
axs[n].plot(
node_pdc[column],
linewidth=1,
color=color_dict[column],
label=column,
alpha=1,
)
if pd.notna(y_axis_max):
axs[n].set_ylim(bottom=0, top=float(y_axis_max))
if not PDC:
mplt.set_subplot_timeseries_format(sub_pos=n)
# axs[n].set_xlim(0,len(node_pdc))
mplt.add_legend()
plt.ylabel(
"Node Price ($/MWh)", color="black", rotation="vertical", labelpad=30
)
if PDC:
plt.xlabel("Intervals", color="black", rotation="horizontal", labelpad=20)
else:
plt.xlabel(timezone, color="black", rotation="horizontal", labelpad=20)
fig.savefig(
node_figure_folder.joinpath(f"{figure_name}.svg"),
dpi=600,
bbox_inches="tight",
)
Data_Out.to_csv(node_figure_folder.joinpath(f"{figure_name}.csv"))
outputs = DataSavedInModule()
return outputs
[docs] def node_price_hist(self, **kwargs):
"""Creates a price histogram for a specifc nodes. Price in $/MWh.
A facet plot will be created if more than one scenario are included on the
user input sheet
Each scenario will be plotted on a separate subplot.
If a set of nodes are passed at input, each will be saved to a separate
figure with node name as a suffix.
Plots and Data are saved within the module
Returns:
DataSavedInModule: DataSavedInModule exception.
"""
outputs = self._node_hist(**kwargs)
return outputs
[docs] def node_price_hist_diff(self, **kwargs):
"""Creates a difference price histogram for a specifc nodes. Price in $/MWh.
This plot requires more than one scenario to display correctly.
A facet plot will be created
Each scenario will be plotted on a separate subplot, with values displaying
the relative difference to the first scenario in the list.
If a set of nodes are passed at input, each will be saved to a separate
figure with node name as a suffix.
Plots and Data are saved within the module
Returns:
DataSavedInModule: DataSavedInModule exception.
"""
outputs = self._node_hist(diff_plot=True, **kwargs)
return outputs
def _node_hist(
self,
diff_plot: bool = False,
figure_name: str = None,
prop: str = None,
start_date_range: str = None,
end_date_range: str = None,
**_,
):
"""Internal code for hist plots.
Called from node_price_hist() or node_price_hist_diff().
Hist range and bin size is currently hardcoded from -100 to +100
with a bin width of 2.5 $/MWh
Args:
diff_plot (bool, optional): If True creates a diff plot.
Defaults to False.
figure_name (str, optional): User defined figure output name.
Defaults to None.
prop (str, optional): comma seperated string of nodes to display.
Defaults to None.
start_date_range (str, optional): Defines a start date at which to represent data from.
Defaults to None.
end_date_range (str, optional): Defines a end date at which to represent data to.
Defaults to None.
Returns:
DataSavedInModule: DataSavedInModule exception.
"""
# List of properties needed by the plot, properties are a set of tuples and
# contain 3 parts: required True/False, property name and scenarios required,
# scenarios must be a list.
properties = [(True, "node_Price", self.Scenarios)]
# Runs get_formatted_data within PlotDataStoreAndProcessor to populate PlotDataStoreAndProcessor dictionary
# with all required properties, returns a 1 if required data is missing
check_input_data = self.get_formatted_data(properties)
if 1 in check_input_data:
return MissingInputData()
node_figure_folder: Path = self.figure_folder.joinpath("node_prices")
node_figure_folder.mkdir(exist_ok=True)
# Select only node specified in Marmot_plot_select.csv.
select_nodes = prop.split(",")
if select_nodes == None:
return InputSheetError()
for node in select_nodes:
logger.info(f"Plotting Prices for Node: {node}")
all_prices = []
for scenario in self.Scenarios:
logger.info(f"Scenario = {scenario}")
price: pd.DataFrame = self["node_Price"][scenario]
try:
price = price.xs(node, level="node")
except KeyError:
logger.info(f"{node} not found")
continue
price = price.groupby(["timestamp"]).sum()
price.rename(columns={"values": scenario}, inplace=True)
if pd.notna(start_date_range):
price = set_timestamp_date_range(
price, start_date_range, end_date_range
)
price.reset_index("timestamp", drop=True, inplace=True)
all_prices.append(price)
if not all_prices:
logger.info(f"Nodes not found in database, input sheet error likely!")
return InputSheetError()
p_hist = pd.concat(all_prices, axis=1)
if diff_plot:
p_hist = p_hist.subtract(p_hist[f"{self.Scenarios[0]}"], axis=0)
p_hist.columns = p_hist.columns.str.replace("_", " ")
data_out = p_hist.add_suffix(" ($/MWh)")
ncols, nrows = set_facet_col_row_dimensions(
self.xlabels, self.ylabels, multi_scenario=self.Scenarios
)
grid_size = ncols * nrows
# Used to calculate any excess axis to delete
plot_number = len(self.Scenarios)
excess_axs = grid_size - plot_number
# setup plot
mplt = SetupSubplot(
nrows, ncols, sharey=True, squeeze=False, ravel_axs=True
)
fig, axs = mplt.get_figure()
plt.subplots_adjust(wspace=0.1, hspace=0.25)
color_dict = dict(zip(p_hist.columns, self.color_list))
# max, min values in histogram range and bin width
# TODO: Determine a way to pass the following as an input.
range_max = 100
range_min = -100
bin_width = 2.5
# no of bines
bins = int((range_max + abs(range_min)) / bin_width)
for n, column in enumerate(p_hist):
# Set plot data equal to 0 if all zero, e.g diff plot
if sum(p_hist[column]) == 0:
data = 0
else:
data = p_hist[column]
# values above range_max and below range_min are binned together
axs[n].hist(
np.clip(data, range_min, range_max),
bins=bins,
range=(range_min, range_max),
color=color_dict[column],
zorder=2,
rwidth=0.8,
)
# get xlabels and edit them
xticks = axs[n].get_xticks()
# min range_min, max range_max
xticks = np.unique(np.clip(xticks, range_min, range_max))
xlabels = xticks.astype(int).astype(str)
# adds a '+' to final xlabel
xlabels[-1] += "+"
if min(xticks) < 0:
xlabels[0] += "-"
# sets x_tick spacing
axs[n].set_xticks(xticks)
axs[n].set_xticklabels(xlabels)
# Remove extra axes
mplt.remove_excess_axs(excess_axs, grid_size)
# Add Facet Labels
mplt.add_facet_labels(xlabels=self.xlabels, ylabels=self.ylabels)
plt.ylabel(
"Occurrence",
color="black",
rotation="vertical",
labelpad=60,
fontsize=24,
)
mplt.add_main_title(node)
if diff_plot:
plt.xlabel(
f"Node LMP Change ($/MWh) relative to {self.Scenarios[0].replace('_',' ')}",
color="black",
labelpad=40,
)
else:
plt.xlabel("Node LMP ($/MWh)", color="black", labelpad=40)
fig.savefig(
node_figure_folder.joinpath(f"{figure_name}_{node}.svg"),
dpi=600,
bbox_inches="tight",
)
data_out.to_csv(node_figure_folder.joinpath(f"{figure_name}_{node}.csv"))
outputs = DataSavedInModule()
return outputs
def _process_data(
self, data_collection: dict, scenario: str, zone_input: str
) -> pd.DataFrame:
df: pd.DataFrame = data_collection.get(scenario)
df = df.xs(zone_input, level=self.AGG_BY)
df = df.rename(columns={"values": scenario})
return df