Module buildstock_query.tools.upgrades_visualizer.figure
Expand source code
from buildstock_query.tools.upgrades_visualizer.plot_utils import PlotParams, ValueTypes
from buildstock_query.tools.upgrades_visualizer.viz_data import VizData
import plotly.graph_objects as go
import polars as pl
import re
class UpgradesPlot:
def __init__(self, viz_data: VizData) -> None:
self.viz_data = viz_data
def get_ylabel(self, end_use):
if len(end_use) == 1:
return end_use[0]
pure_end_use_name = end_use[0].removeprefix("end_use_")
pure_end_use_name = pure_end_use_name.removeprefix("fuel_use_")
pure_end_use_name = "_".join(pure_end_use_name.split("_")[1:])
return f"{len(end_use)}_fuels_{pure_end_use_name}"
def explode_str(self, input_str):
input_str = str(input_str).lower()
month2num = {"january": 1, "february": 2, "march": 3, "april": 4,
"may": 5, "june": 6, "july": 7, "august": 8,
"september": 9, "october": 10, "november": 11, "december": 12}
input_str = str(month2num[input_str] if input_str in month2num else input_str)
input_str = [
int(x) if x and x[0] in "0123456789" else x
for x in re.split(r"([\<\-])|([0-9]+)", input_str)
]
return tuple("X" if x is None else x for x in input_str)
def get_plot(self, params: PlotParams):
if len(params.group_by) >= 2 or params.upgrade is not None or \
(params.value_type in [ValueTypes.distribution, ValueTypes.scatter] and len(params.group_by) >= 1):
params.upgrade = params.upgrade if params.upgrade else 0
params.group_by = ['upgrade'] if not params.group_by else params.group_by
plot_df = self.viz_data.get_plotting_df(upgrade=params.upgrade, params=params)
else:
params.group_by = ['upgrade'] + params.group_by
plot_df = self.viz_data.get_plotting_df_all_upgrades(params=params)
return self._get_plot(plot_df, params)
def _get_plot(self, df, params: PlotParams):
fig = go.Figure()
counter = 0
counter2 = 0
report_dfs = [pl.DataFrame()]
if params.value_type in [ValueTypes.mean, ValueTypes.total, ValueTypes.count]:
xtitle = ", ".join(params.group_by[1:]) if len(params.group_by) > 1 else params.group_by[0]
ytitle = f"{self.get_ylabel(params.enduses)}_{params.value_type.value}_{params.savings_type.value}"
elif params.value_type in [ValueTypes.distribution]:
xtitle = ", ".join(params.group_by[1:]) if len(params.group_by) > 1 else params.group_by[0]
ytitle = f"{self.get_ylabel(params.enduses)}_{params.savings_type.value}"
else:
assert params.value_type in [ValueTypes.scatter]
xtitle = "baseline_value"
ytitle = f"{self.get_ylabel(params.enduses)}_{params.savings_type.value}"
for grp0, sub_df in df.groupby(params.group_by[0], maintain_order=True):
upgrade = int(grp0) if params.group_by[0] == 'upgrade' else params.upgrade
upgrade = upgrade or 0
yvals = []
xvals = []
second_groups = []
sample_counts = []
upgrades = []
hovervals = []
if len(params.group_by) > 1:
second_plots = [(group_name, group_df) for group_name, group_df in
sub_df.groupby(params.group_by[1:], maintain_order=True)]
else:
second_plots = [(tuple(), sub_df)]
for second_name, second_df in second_plots:
name = ','.join(second_name) if second_name else str(grp0)
count = len(second_df)
mean = pl.mean(second_df['value'])
if counter >= 500:
yvals.append(0.1)
xvals.append("Too many groups")
sample_counts.append(0)
upgrades.append(upgrade)
hovervals.append("Too many groups")
grp0 = "Too many groups"
break
if params.value_type in [ValueTypes.total, ValueTypes.mean, ValueTypes.count]:
if params.value_type == ValueTypes.total:
val = second_df['value'].sum()
elif params.value_type == ValueTypes.count:
val = second_df['building_id'].n_unique()
else:
val = pl.mean(second_df['value'])
val = float(val)
yvals.append(val)
xvals.append(name)
sample_counts.append(count)
upgrades.append(upgrade)
hovertext = f"{self.viz_data.upgrade2name.get(upgrade)}<br>{grp0}<br>{name}<br>Average {mean}."\
f"<br>Sample Count: {count}."
f"<br>Units Count: {count * self.viz_data.sample_weight}."
hovervals.append(hovertext)
second_groups.append(name)
elif params.value_type in [ValueTypes.distribution, ValueTypes.scatter]:
hovertext = [f'{self.viz_data.upgrade2name.get(upgrade)}<br>{grp0}<br>{name}<br>Building:'
f'{bid}<br>Sample Count: {count}'
for bid in second_df['building_id'].to_list()]
if params.value_type == ValueTypes.distribution:
xvals.extend([name] * count)
yvals.extend(second_df['value'].to_list())
second_groups.extend([name] * count)
sample_counts.extend([count] * count)
upgrades.extend([upgrade] * count)
hovervals.extend(hovertext)
else:
xvals.append(second_df['baseline_value'].to_list())
yvals.append(second_df['value'].to_list())
second_groups.append(name)
sample_counts.append([count] * count)
upgrades.append([upgrade] * count)
hovervals.append(hovertext)
counter += 1
counter2 += 1
self._add_plot(params, fig, grp0, yvals, xvals, second_groups, hovervals, len(params.group_by) > 1)
try:
sub_df = pl.DataFrame({xtitle: xvals, ytitle: yvals, 'upgrade': [f'Upgrade {grp0}'] * len(xvals),
'sample_count': sample_counts, 'info': hovervals})
# sub_df = sub_df.with_columns(pl.col(ytitle).cast(pl.Float32))
report_dfs.append(sub_df)
except Exception:
continue
if params.change_type:
title = f"{params.value_type} - {params.savings_type} for {params.change_type} buildings"
else:
title = f'{params.value_type} - {params.savings_type} value'
if params.group_by[0] != "upgrade":
title = f"Upgrade {params.upgrade} - {title}"
self._update_layout(params, fig, xtitle, ytitle, title, len(params.group_by) > 1)
return fig, pl.concat(report_dfs)
def _update_layout(self, params: PlotParams, fig, xtitle, ytitle, title, multi_group=False):
if params.value_type in [ValueTypes.mean, ValueTypes.total, ValueTypes.count]:
fig.update_layout(yaxis_title=ytitle,
barmode='group',
xaxis_title=xtitle,
legend={"title": params.group_by[0]},
title=title)
elif params.value_type in [ValueTypes.distribution]:
fig.update_layout(yaxis_title=ytitle,
boxmode="group" if multi_group else "overlay",
xaxis_title=xtitle,
title=title,
legend={"title": params.group_by[0]},
clickmode='event+select')
elif params.value_type == ValueTypes.scatter:
fig.update_layout(yaxis_title=ytitle,
title=title,
boxmode="group" if multi_group else "overlay",
legend={"title": params.group_by[0]},
clickmode='event+select')
fig.add_annotation(
dict(
x=0.5,
y=-0.20,
showarrow=False,
text=xtitle,
xref="paper",
yref="paper",
xanchor="center",
yanchor="top",
font=dict(size=14)
)
)
def _add_plot(self, params: PlotParams, fig, grp0, yvals, xvals, second_groups, hovervals, multi_group=False):
if params.value_type in [ValueTypes.mean, ValueTypes.total, ValueTypes.count]:
fig.add_trace(go.Bar(
y=yvals,
x=xvals,
hovertext=hovervals,
name=f'{grp0}',
hoverinfo="all"
)).update_traces(
marker={"line": {"width": 0.5, "color": "rgb(0,0,0)"}}
)
elif params.value_type == ValueTypes.distribution:
fig.add_trace(go.Box(
y=yvals,
x=xvals,
name=f'{grp0}',
boxpoints='suspectedoutliers',
boxmean=True, # represent mean
hovertext=hovervals,
hoverinfo="all"
))
elif params.value_type == ValueTypes.scatter:
if multi_group:
self._add_multi_scatter(fig, grp0, list(zip(second_groups, xvals, yvals, hovervals)))
else:
fig.add_trace(go.Scatter(
y=yvals[0],
x=xvals[0],
name=f'{grp0}',
mode='markers',
hovertext=hovervals[0],
hoverinfo="all"))
def _add_multi_scatter(self, fig, grp0, plot_tuples):
# Define the width of each subplot based on total plots and desired gaps
num_plots = len(plot_tuples)
gap_fraction = 0.05 # for a 5% gap
subplot_width = (1.0 - (num_plots-1)*gap_fraction) / num_plots
# Iterate over the tuples and add scatter plots
for index, (name, xvals, yvals, hovervals) in enumerate(plot_tuples):
domain_start = index * (subplot_width + gap_fraction)
domain_end = domain_start + subplot_width
# Constructing the appropriate xaxis and yaxis keys
scatter_xaxis_key = 'x' if index == 0 else f'x{index+1}'
scatter_yaxis_key = 'y' if index == 0 else f'y{index+1}'
layout_xaxis_key = f'xaxis{"" if index == 0 else index+1}'
layout_yaxis_key = f'yaxis{"" if index == 0 else index+1}'
# Add scatter plot
fig.add_trace(go.Scatter(x=xvals,
y=yvals,
hovertext=hovervals[0],
name=grp0,
legendgroup=grp0,
mode='markers',
showlegend=True if index == 0 else False,
xaxis=scatter_xaxis_key,
yaxis=scatter_yaxis_key,
hoverinfo="all"))
# Update the layout with the appropriate axis properties
fig.update_layout({
layout_xaxis_key: {
'domain': [domain_start, domain_end],
'anchor': scatter_yaxis_key,
'title': name
},
layout_yaxis_key: {
'anchor': scatter_xaxis_key
}
})
Classes
class UpgradesPlot (viz_data: VizData)
-
Expand source code
class UpgradesPlot: def __init__(self, viz_data: VizData) -> None: self.viz_data = viz_data def get_ylabel(self, end_use): if len(end_use) == 1: return end_use[0] pure_end_use_name = end_use[0].removeprefix("end_use_") pure_end_use_name = pure_end_use_name.removeprefix("fuel_use_") pure_end_use_name = "_".join(pure_end_use_name.split("_")[1:]) return f"{len(end_use)}_fuels_{pure_end_use_name}" def explode_str(self, input_str): input_str = str(input_str).lower() month2num = {"january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6, "july": 7, "august": 8, "september": 9, "october": 10, "november": 11, "december": 12} input_str = str(month2num[input_str] if input_str in month2num else input_str) input_str = [ int(x) if x and x[0] in "0123456789" else x for x in re.split(r"([\<\-])|([0-9]+)", input_str) ] return tuple("X" if x is None else x for x in input_str) def get_plot(self, params: PlotParams): if len(params.group_by) >= 2 or params.upgrade is not None or \ (params.value_type in [ValueTypes.distribution, ValueTypes.scatter] and len(params.group_by) >= 1): params.upgrade = params.upgrade if params.upgrade else 0 params.group_by = ['upgrade'] if not params.group_by else params.group_by plot_df = self.viz_data.get_plotting_df(upgrade=params.upgrade, params=params) else: params.group_by = ['upgrade'] + params.group_by plot_df = self.viz_data.get_plotting_df_all_upgrades(params=params) return self._get_plot(plot_df, params) def _get_plot(self, df, params: PlotParams): fig = go.Figure() counter = 0 counter2 = 0 report_dfs = [pl.DataFrame()] if params.value_type in [ValueTypes.mean, ValueTypes.total, ValueTypes.count]: xtitle = ", ".join(params.group_by[1:]) if len(params.group_by) > 1 else params.group_by[0] ytitle = f"{self.get_ylabel(params.enduses)}_{params.value_type.value}_{params.savings_type.value}" elif params.value_type in [ValueTypes.distribution]: xtitle = ", ".join(params.group_by[1:]) if len(params.group_by) > 1 else params.group_by[0] ytitle = f"{self.get_ylabel(params.enduses)}_{params.savings_type.value}" else: assert params.value_type in [ValueTypes.scatter] xtitle = "baseline_value" ytitle = f"{self.get_ylabel(params.enduses)}_{params.savings_type.value}" for grp0, sub_df in df.groupby(params.group_by[0], maintain_order=True): upgrade = int(grp0) if params.group_by[0] == 'upgrade' else params.upgrade upgrade = upgrade or 0 yvals = [] xvals = [] second_groups = [] sample_counts = [] upgrades = [] hovervals = [] if len(params.group_by) > 1: second_plots = [(group_name, group_df) for group_name, group_df in sub_df.groupby(params.group_by[1:], maintain_order=True)] else: second_plots = [(tuple(), sub_df)] for second_name, second_df in second_plots: name = ','.join(second_name) if second_name else str(grp0) count = len(second_df) mean = pl.mean(second_df['value']) if counter >= 500: yvals.append(0.1) xvals.append("Too many groups") sample_counts.append(0) upgrades.append(upgrade) hovervals.append("Too many groups") grp0 = "Too many groups" break if params.value_type in [ValueTypes.total, ValueTypes.mean, ValueTypes.count]: if params.value_type == ValueTypes.total: val = second_df['value'].sum() elif params.value_type == ValueTypes.count: val = second_df['building_id'].n_unique() else: val = pl.mean(second_df['value']) val = float(val) yvals.append(val) xvals.append(name) sample_counts.append(count) upgrades.append(upgrade) hovertext = f"{self.viz_data.upgrade2name.get(upgrade)}<br>{grp0}<br>{name}<br>Average {mean}."\ f"<br>Sample Count: {count}." f"<br>Units Count: {count * self.viz_data.sample_weight}." hovervals.append(hovertext) second_groups.append(name) elif params.value_type in [ValueTypes.distribution, ValueTypes.scatter]: hovertext = [f'{self.viz_data.upgrade2name.get(upgrade)}<br>{grp0}<br>{name}<br>Building:' f'{bid}<br>Sample Count: {count}' for bid in second_df['building_id'].to_list()] if params.value_type == ValueTypes.distribution: xvals.extend([name] * count) yvals.extend(second_df['value'].to_list()) second_groups.extend([name] * count) sample_counts.extend([count] * count) upgrades.extend([upgrade] * count) hovervals.extend(hovertext) else: xvals.append(second_df['baseline_value'].to_list()) yvals.append(second_df['value'].to_list()) second_groups.append(name) sample_counts.append([count] * count) upgrades.append([upgrade] * count) hovervals.append(hovertext) counter += 1 counter2 += 1 self._add_plot(params, fig, grp0, yvals, xvals, second_groups, hovervals, len(params.group_by) > 1) try: sub_df = pl.DataFrame({xtitle: xvals, ytitle: yvals, 'upgrade': [f'Upgrade {grp0}'] * len(xvals), 'sample_count': sample_counts, 'info': hovervals}) # sub_df = sub_df.with_columns(pl.col(ytitle).cast(pl.Float32)) report_dfs.append(sub_df) except Exception: continue if params.change_type: title = f"{params.value_type} - {params.savings_type} for {params.change_type} buildings" else: title = f'{params.value_type} - {params.savings_type} value' if params.group_by[0] != "upgrade": title = f"Upgrade {params.upgrade} - {title}" self._update_layout(params, fig, xtitle, ytitle, title, len(params.group_by) > 1) return fig, pl.concat(report_dfs) def _update_layout(self, params: PlotParams, fig, xtitle, ytitle, title, multi_group=False): if params.value_type in [ValueTypes.mean, ValueTypes.total, ValueTypes.count]: fig.update_layout(yaxis_title=ytitle, barmode='group', xaxis_title=xtitle, legend={"title": params.group_by[0]}, title=title) elif params.value_type in [ValueTypes.distribution]: fig.update_layout(yaxis_title=ytitle, boxmode="group" if multi_group else "overlay", xaxis_title=xtitle, title=title, legend={"title": params.group_by[0]}, clickmode='event+select') elif params.value_type == ValueTypes.scatter: fig.update_layout(yaxis_title=ytitle, title=title, boxmode="group" if multi_group else "overlay", legend={"title": params.group_by[0]}, clickmode='event+select') fig.add_annotation( dict( x=0.5, y=-0.20, showarrow=False, text=xtitle, xref="paper", yref="paper", xanchor="center", yanchor="top", font=dict(size=14) ) ) def _add_plot(self, params: PlotParams, fig, grp0, yvals, xvals, second_groups, hovervals, multi_group=False): if params.value_type in [ValueTypes.mean, ValueTypes.total, ValueTypes.count]: fig.add_trace(go.Bar( y=yvals, x=xvals, hovertext=hovervals, name=f'{grp0}', hoverinfo="all" )).update_traces( marker={"line": {"width": 0.5, "color": "rgb(0,0,0)"}} ) elif params.value_type == ValueTypes.distribution: fig.add_trace(go.Box( y=yvals, x=xvals, name=f'{grp0}', boxpoints='suspectedoutliers', boxmean=True, # represent mean hovertext=hovervals, hoverinfo="all" )) elif params.value_type == ValueTypes.scatter: if multi_group: self._add_multi_scatter(fig, grp0, list(zip(second_groups, xvals, yvals, hovervals))) else: fig.add_trace(go.Scatter( y=yvals[0], x=xvals[0], name=f'{grp0}', mode='markers', hovertext=hovervals[0], hoverinfo="all")) def _add_multi_scatter(self, fig, grp0, plot_tuples): # Define the width of each subplot based on total plots and desired gaps num_plots = len(plot_tuples) gap_fraction = 0.05 # for a 5% gap subplot_width = (1.0 - (num_plots-1)*gap_fraction) / num_plots # Iterate over the tuples and add scatter plots for index, (name, xvals, yvals, hovervals) in enumerate(plot_tuples): domain_start = index * (subplot_width + gap_fraction) domain_end = domain_start + subplot_width # Constructing the appropriate xaxis and yaxis keys scatter_xaxis_key = 'x' if index == 0 else f'x{index+1}' scatter_yaxis_key = 'y' if index == 0 else f'y{index+1}' layout_xaxis_key = f'xaxis{"" if index == 0 else index+1}' layout_yaxis_key = f'yaxis{"" if index == 0 else index+1}' # Add scatter plot fig.add_trace(go.Scatter(x=xvals, y=yvals, hovertext=hovervals[0], name=grp0, legendgroup=grp0, mode='markers', showlegend=True if index == 0 else False, xaxis=scatter_xaxis_key, yaxis=scatter_yaxis_key, hoverinfo="all")) # Update the layout with the appropriate axis properties fig.update_layout({ layout_xaxis_key: { 'domain': [domain_start, domain_end], 'anchor': scatter_yaxis_key, 'title': name }, layout_yaxis_key: { 'anchor': scatter_xaxis_key } })
Methods
def explode_str(self, input_str)
-
Expand source code
def explode_str(self, input_str): input_str = str(input_str).lower() month2num = {"january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6, "july": 7, "august": 8, "september": 9, "october": 10, "november": 11, "december": 12} input_str = str(month2num[input_str] if input_str in month2num else input_str) input_str = [ int(x) if x and x[0] in "0123456789" else x for x in re.split(r"([\<\-])|([0-9]+)", input_str) ] return tuple("X" if x is None else x for x in input_str)
def get_plot(self, params: PlotParams)
-
Expand source code
def get_plot(self, params: PlotParams): if len(params.group_by) >= 2 or params.upgrade is not None or \ (params.value_type in [ValueTypes.distribution, ValueTypes.scatter] and len(params.group_by) >= 1): params.upgrade = params.upgrade if params.upgrade else 0 params.group_by = ['upgrade'] if not params.group_by else params.group_by plot_df = self.viz_data.get_plotting_df(upgrade=params.upgrade, params=params) else: params.group_by = ['upgrade'] + params.group_by plot_df = self.viz_data.get_plotting_df_all_upgrades(params=params) return self._get_plot(plot_df, params)
def get_ylabel(self, end_use)
-
Expand source code
def get_ylabel(self, end_use): if len(end_use) == 1: return end_use[0] pure_end_use_name = end_use[0].removeprefix("end_use_") pure_end_use_name = pure_end_use_name.removeprefix("fuel_use_") pure_end_use_name = "_".join(pure_end_use_name.split("_")[1:]) return f"{len(end_use)}_fuels_{pure_end_use_name}"