Module buildstock_query.aggregate_query
Expand source code
import sqlalchemy as sa
from sqlalchemy.sql import func as safunc
import datetime
import numpy as np
import logging
import buildstock_query.main as main
from buildstock_query.schema.query_params import AnnualQuery, TSQuery
from buildstock_query.schema.helpers import gather_params
from pydantic import validate_arguments
from typing import Union
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
FUELS = ['electricity', 'natural_gas', 'propane', 'fuel_oil', 'coal', 'wood_cord', 'wood_pellets']
class BuildStockAggregate:
"""A class to do aggregation queries for both timeseries and annual results.
"""
def __init__(self, buildstock_query: 'main.BuildStockQuery') -> None:
self._bsq = buildstock_query
@gather_params(AnnualQuery)
def aggregate_annual(self, *,
params: AnnualQuery):
join_list = list(params.join_list) if params.join_list else []
weights = list(params.weights) if params.weights else []
restrict = list(params.restrict) if params.restrict else []
[self._bsq._get_table(jl[0]) for jl in join_list] # ingress all tables in join list
if params.upgrade_id in {None, 0, '0'}:
enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='baseline')
upgrade_id = None
else:
upgrade_id = self._bsq._validate_upgrade(params.upgrade_id)
enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='upgrade')
total_weight = self._bsq._get_weight(weights)
enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name))
for enduse in enduse_cols]
if params.get_quartiles:
enduse_selection += [sa.func.approx_percentile(enduse, [0, 0.02, 0.25, 0.5, 0.75, 0.98, 1]).label(
f"{self._bsq._simple_label(enduse.name)}__quartiles") for enduse in enduse_cols]
if params.get_nonzero_count:
enduse_selection += [safunc.sum(sa.case((safunc.coalesce(enduse, 0) != 0, 1), else_=0)
* total_weight).label(f"{self._bsq._simple_label(enduse.name)}__nonzero_units_count")
for enduse in enduse_cols]
grouping_metrics_selction = [safunc.sum(1).label("sample_count"),
safunc.sum(total_weight).label("units_count")]
if not params.group_by:
query = sa.select(grouping_metrics_selction + enduse_selection)
group_by_selection = []
else:
group_by_selection = self._bsq._process_groupby_cols(params.group_by, annual_only=True)
query = sa.select(group_by_selection + grouping_metrics_selction + enduse_selection)
# jj = self.bs_table.join(self.ts_table, self.ts_table.c['building_id']==self.bs_table.c['building_id'])
# self._compile(query.select_from(jj))
if upgrade_id not in [None, 0, '0']:
if self._bsq.up_table is None:
raise ValueError("The run doesn't contain upgrades")
tbljoin = self._bsq.bs_table.join(
self._bsq.up_table, sa.and_(self._bsq.bs_table.c[self._bsq.building_id_column_name] ==
self._bsq.up_table.c[self._bsq.building_id_column_name],
self._bsq.up_table.c["upgrade"] == str(upgrade_id),
self._bsq._up_successful_condition))
query = query.select_from(tbljoin)
restrict = [(self._bsq._bs_completed_status_col, [self._bsq.db_schema.completion_values.success])] + restrict
query = self._bsq._add_join(query, join_list)
query = self._bsq._add_restrict(query, restrict)
query = self._bsq._add_group_by(query, group_by_selection)
query = self._bsq._add_order_by(query, group_by_selection if params.sort else [])
if params.get_query_only:
return self._bsq._compile(query)
return self._bsq.execute(query)
def _aggregate_timeseries_light(self,
params: TSQuery
):
"""
Lighter version of aggregate_timeseries where each enduse is submitted as a separate query to be light on
Athena. For information on the input parameters, check the documentation on aggregate_timeseries.
"""
enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='timeseries')
batch_queries_to_submit = []
for enduse in enduse_cols:
new_query = params.copy()
new_query.enduses = [enduse.name]
new_query.split_enduses = False
query = self.aggregate_timeseries(params=new_query)
batch_queries_to_submit.append(query)
if params.get_query_only:
logger.warning("Not recommended to use get_query_only and split_enduses used together."
" The results from the queries cannot be directly combined to get the desired result."
" There are further processing done in the function. The queries should be used for"
" information or debugging purpose only. Use get_query_only=False to get proper result.")
return batch_queries_to_submit
batch_query_id = self._bsq.submit_batch_query(batch_queries_to_submit)
result_dfs = self._bsq.get_batch_query_result(batch_id=batch_query_id, combine=False)
logger.info("Joining the individual enduses result into a single DataFrame")
group_by = self._bsq._clean_group_by(params.group_by)
for res in result_dfs:
res.set_index(group_by, inplace=True)
self.result_dfs = result_dfs
joined_enduses_df = result_dfs[0].drop(columns=['query_id'])
for enduse, res in list(zip(params.enduses, result_dfs))[1:]:
if not isinstance(enduse, str):
enduse = enduse.name
joined_enduses_df = joined_enduses_df.join(res[[enduse]])
logger.info("Joining Completed.")
return joined_enduses_df.reset_index()
@gather_params(TSQuery)
def aggregate_timeseries(self, params: TSQuery):
if self._bsq.ts_table is None:
raise ValueError("Not timeseries table available")
upgrade_id = self._bsq._validate_upgrade(params.upgrade_id)
if params.timestamp_grouping_func and \
params.timestamp_grouping_func not in ['hour', 'day', 'month']:
raise ValueError("timestamp_grouping_func must be one of ['hour', 'day', 'month']")
if params.split_enduses:
return self._aggregate_timeseries_light(params)
[self._bsq._get_table(jl[0]) for jl in params.join_list] # ingress all tables in join list
enduses_cols = self._bsq._get_enduse_cols(params.enduses, table='timeseries')
total_weight = self._bsq._get_weight(params.weights)
enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name))
for enduse in enduses_cols]
group_by = list(params.group_by)
if self._bsq.timestamp_column_name not in group_by and params.collapse_ts:
logger.info("Aggregation done accross timestamps. Result no longer a timeseries.")
# The aggregation is done across time so we should correct sample_count and units_count
rows_per_building = self._bsq._get_rows_per_building()
grouping_metrics_selection = [(safunc.sum(1) / rows_per_building).label(
"sample_count"), safunc.sum(total_weight / rows_per_building).label("units_count")]
elif self._bsq.timestamp_column_name not in group_by:
group_by.append(self._bsq.timestamp_column_name)
grouping_metrics_selection = [safunc.sum(1).label(
"sample_count"), safunc.sum(total_weight).label("units_count")]
elif params.collapse_ts:
raise ValueError("collapse_ts is true, but there is timestamp column in group_by.")
else:
grouping_metrics_selection = [safunc.sum(1).label(
"sample_count"), safunc.sum(total_weight).label("units_count")]
if (colname := self._bsq.timestamp_column_name) in group_by and \
params.timestamp_grouping_func:
# sample_count = count(distinct(building_id))
# units_count = count(distinct(buuilding_id)) * sum(total_weight) / sum(1)
grouping_metrics_selection = [safunc.count(safunc.distinct(self._bsq.ts_bldgid_column)).
label("sample_count"),
(safunc.count(safunc.distinct(self._bsq.ts_bldgid_column)) *
safunc.sum(total_weight) / safunc.sum(1)).label("units_count"),
(safunc.sum(1) / safunc.count(safunc.distinct(self._bsq.ts_bldgid_column))).
label("rows_per_sample"), ]
indx = group_by.index(colname)
sim_info = self._bsq._get_simulation_info()
if sim_info.offset > 0:
# If timestamps are not period begining we should make them so for timestamp_grouping_func aggregation.
new_col = sa.func.date_trunc(params.timestamp_grouping_func,
sa.func.date_add(sim_info.unit, -sim_info.offset,
self._bsq.timestamp_column)).label(colname)
else:
new_col = sa.func.date_trunc(params.timestamp_grouping_func, self._bsq.timestamp_column).label(colname)
group_by[indx] = new_col
group_by_selection = self._bsq._process_groupby_cols(group_by, annual_only=False)
query = sa.select(group_by_selection + grouping_metrics_selection + enduse_selection)
query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.ts_bldgid_column)
if params.join_list:
query = self._bsq._add_join(query, params.join_list)
group_by_names = [g.name for g in group_by_selection]
upgrade_in_restrict = any(entry[0] == 'upgrade' for entry in params.restrict)
if self._bsq.up_table is not None and not upgrade_in_restrict and 'upgrade' not in group_by_names:
logger.info(f"Restricting query to Upgrade {upgrade_id}.")
params.restrict = list(params.restrict) + [(self._bsq._ts_upgrade_col, [upgrade_id])]
query = self._bsq._add_restrict(query, params.restrict)
query = self._bsq._add_group_by(query, group_by_selection)
query = self._bsq._add_order_by(query, group_by_selection if params.sort else [])
query = query.limit(params.limit) if params.limit else query
if params.get_query_only:
return self._bsq._compile(query)
return self._bsq.execute(query)
@validate_arguments(config=dict(smart_union=True))
def get_building_average_kws_at(self, *,
at_hour: Union[list[float], float],
at_days: list[float],
enduses: list[str],
get_query_only: bool = False):
"""
Aggregates the timeseries result on select enduses, for the given days and hours.
If all of the hour(s) fall exactly on the simulation timestamps, the aggregation is done by averaging the kW at
those time stamps. If any of the hour(s) fall in between timestamps, then the following process is followed:
i. The average kWs is calculated for timestamps specified by the hour, or just after it. Call it upper_kw
ii. The average kWs is calculated for timestamps specified by the hour, or just before it. Call it lower_kw
iii. Return the interpolation between upper_kw and lower_kw based on the average location of the hour(s)
between the upper and lower timestamps.
Check the argument description below to learn about additional features and options.
Args:
at_hour: the hour(s) at which the average kWs of buildings need to be calculated at. It can either be a
single number if the hour is same for all days, or a list of numbers if the kW needs to be
calculated for different hours for different days.
at_days: The list of days (of year) for which the average kW is to be calculated for.
enduses: The list of enduses for which to calculate the average kWs
get_query_only: Skips submitting the query to Athena and just returns the query strings. Useful for batch
submitting multiple queries or debugging.
Returns:
If get_query_only is True, returns two queries that gets the KW at two timestamps that are to immediate
left and right of the the supplied hour.
If get_query_only is False, returns the average KW of each building at the given hour(s) across the
supplied days.
"""
if isinstance(at_hour, list):
if len(at_hour) != len(at_days) or not at_hour:
raise ValueError("The length of at_hour list should be the same as length of at_days list and"
" not be empty")
elif isinstance(at_hour, (float, int)):
at_hour = [at_hour] * len(at_days)
else:
raise ValueError("At hour should be a list or a number")
enduse_cols = self._bsq._get_enduse_cols(enduses, table='timeseries')
total_weight = self._bsq._get_weight([])
sim_info = self._bsq._get_simulation_info()
sim_year, sim_interval_seconds = sim_info.year, sim_info.interval
kw_factor = 3600.0 / sim_interval_seconds
enduse_selection = [safunc.avg(enduse * total_weight * kw_factor).label(self._bsq._simple_label(enduse.name))
for enduse in enduse_cols]
grouping_metrics_selection = [safunc.sum(1).label("sample_count"),
safunc.sum(total_weight).label("units_count")]
def get_upper_timestamps(day, hour):
new_dt = datetime.datetime(year=sim_year, month=1, day=1)
if round(hour * 3600 % sim_interval_seconds, 2) == 0:
# if the hour falls exactly on the simulation timestamp, use the same timestamp
# for both lower and upper
add = 0
else:
add = 1
upper_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds *
(int(hour * 3600 / sim_interval_seconds) + add))
if upper_dt.year > sim_year:
upper_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds *
(int(hour * 3600 / sim_interval_seconds)))
return upper_dt
def get_lower_timestamps(day, hour):
new_dt = datetime.datetime(year=sim_year, month=1, day=1)
lower_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds * int(hour * 3600 /
sim_interval_seconds))
return lower_dt
# check if the supplied hours fall exactly on the simulation timestamps
exact_times = np.all([round(h * 3600 % sim_interval_seconds, 2) == 0 for h in at_hour])
lower_timestamps = [get_lower_timestamps(d - 1, h) for d, h in zip(at_days, at_hour)]
upper_timestamps = [get_upper_timestamps(d - 1, h) for d, h in zip(at_days, at_hour)]
query = sa.select([self._bsq.ts_bldgid_column] + grouping_metrics_selection + enduse_selection)
query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.ts_bldgid_column)
query = self._bsq._add_group_by(query, [self._bsq.ts_bldgid_column])
query = self._bsq._add_order_by(query, [self._bsq.ts_bldgid_column])
lower_val_query = self._bsq._add_restrict(query, [(self._bsq.timestamp_column_name, lower_timestamps)])
upper_val_query = self._bsq._add_restrict(query, [(self._bsq.timestamp_column_name, upper_timestamps)])
if exact_times:
# only one query is sufficient if the hours fall in exact timestamps
queries = [lower_val_query]
else:
queries = [lower_val_query, upper_val_query]
query_strs = [self._bsq._compile(q) for q in queries]
if get_query_only:
return query_strs
batch_id = self._bsq.submit_batch_query(query_strs)
if exact_times:
vals, = self._bsq.get_batch_query_result(batch_id, combine=False)
return vals
else:
lower_vals, upper_vals = self._bsq.get_batch_query_result(batch_id, combine=False)
avg_upper_weight = np.mean([min_of_hour / sim_interval_seconds for hour in at_hour if
(min_of_hour := hour * 3600 % sim_interval_seconds)])
avg_lower_weight = 1 - avg_upper_weight
# modify the lower vals to make it weighted average of upper and lower vals
lower_vals[enduses] = lower_vals[enduses] * avg_lower_weight + upper_vals[enduses] * avg_upper_weight
return lower_vals
Classes
class BuildStockAggregate (buildstock_query: main.BuildStockQuery)
-
A class to do aggregation queries for both timeseries and annual results.
Expand source code
class BuildStockAggregate: """A class to do aggregation queries for both timeseries and annual results. """ def __init__(self, buildstock_query: 'main.BuildStockQuery') -> None: self._bsq = buildstock_query @gather_params(AnnualQuery) def aggregate_annual(self, *, params: AnnualQuery): join_list = list(params.join_list) if params.join_list else [] weights = list(params.weights) if params.weights else [] restrict = list(params.restrict) if params.restrict else [] [self._bsq._get_table(jl[0]) for jl in join_list] # ingress all tables in join list if params.upgrade_id in {None, 0, '0'}: enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='baseline') upgrade_id = None else: upgrade_id = self._bsq._validate_upgrade(params.upgrade_id) enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='upgrade') total_weight = self._bsq._get_weight(weights) enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name)) for enduse in enduse_cols] if params.get_quartiles: enduse_selection += [sa.func.approx_percentile(enduse, [0, 0.02, 0.25, 0.5, 0.75, 0.98, 1]).label( f"{self._bsq._simple_label(enduse.name)}__quartiles") for enduse in enduse_cols] if params.get_nonzero_count: enduse_selection += [safunc.sum(sa.case((safunc.coalesce(enduse, 0) != 0, 1), else_=0) * total_weight).label(f"{self._bsq._simple_label(enduse.name)}__nonzero_units_count") for enduse in enduse_cols] grouping_metrics_selction = [safunc.sum(1).label("sample_count"), safunc.sum(total_weight).label("units_count")] if not params.group_by: query = sa.select(grouping_metrics_selction + enduse_selection) group_by_selection = [] else: group_by_selection = self._bsq._process_groupby_cols(params.group_by, annual_only=True) query = sa.select(group_by_selection + grouping_metrics_selction + enduse_selection) # jj = self.bs_table.join(self.ts_table, self.ts_table.c['building_id']==self.bs_table.c['building_id']) # self._compile(query.select_from(jj)) if upgrade_id not in [None, 0, '0']: if self._bsq.up_table is None: raise ValueError("The run doesn't contain upgrades") tbljoin = self._bsq.bs_table.join( self._bsq.up_table, sa.and_(self._bsq.bs_table.c[self._bsq.building_id_column_name] == self._bsq.up_table.c[self._bsq.building_id_column_name], self._bsq.up_table.c["upgrade"] == str(upgrade_id), self._bsq._up_successful_condition)) query = query.select_from(tbljoin) restrict = [(self._bsq._bs_completed_status_col, [self._bsq.db_schema.completion_values.success])] + restrict query = self._bsq._add_join(query, join_list) query = self._bsq._add_restrict(query, restrict) query = self._bsq._add_group_by(query, group_by_selection) query = self._bsq._add_order_by(query, group_by_selection if params.sort else []) if params.get_query_only: return self._bsq._compile(query) return self._bsq.execute(query) def _aggregate_timeseries_light(self, params: TSQuery ): """ Lighter version of aggregate_timeseries where each enduse is submitted as a separate query to be light on Athena. For information on the input parameters, check the documentation on aggregate_timeseries. """ enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='timeseries') batch_queries_to_submit = [] for enduse in enduse_cols: new_query = params.copy() new_query.enduses = [enduse.name] new_query.split_enduses = False query = self.aggregate_timeseries(params=new_query) batch_queries_to_submit.append(query) if params.get_query_only: logger.warning("Not recommended to use get_query_only and split_enduses used together." " The results from the queries cannot be directly combined to get the desired result." " There are further processing done in the function. The queries should be used for" " information or debugging purpose only. Use get_query_only=False to get proper result.") return batch_queries_to_submit batch_query_id = self._bsq.submit_batch_query(batch_queries_to_submit) result_dfs = self._bsq.get_batch_query_result(batch_id=batch_query_id, combine=False) logger.info("Joining the individual enduses result into a single DataFrame") group_by = self._bsq._clean_group_by(params.group_by) for res in result_dfs: res.set_index(group_by, inplace=True) self.result_dfs = result_dfs joined_enduses_df = result_dfs[0].drop(columns=['query_id']) for enduse, res in list(zip(params.enduses, result_dfs))[1:]: if not isinstance(enduse, str): enduse = enduse.name joined_enduses_df = joined_enduses_df.join(res[[enduse]]) logger.info("Joining Completed.") return joined_enduses_df.reset_index() @gather_params(TSQuery) def aggregate_timeseries(self, params: TSQuery): if self._bsq.ts_table is None: raise ValueError("Not timeseries table available") upgrade_id = self._bsq._validate_upgrade(params.upgrade_id) if params.timestamp_grouping_func and \ params.timestamp_grouping_func not in ['hour', 'day', 'month']: raise ValueError("timestamp_grouping_func must be one of ['hour', 'day', 'month']") if params.split_enduses: return self._aggregate_timeseries_light(params) [self._bsq._get_table(jl[0]) for jl in params.join_list] # ingress all tables in join list enduses_cols = self._bsq._get_enduse_cols(params.enduses, table='timeseries') total_weight = self._bsq._get_weight(params.weights) enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name)) for enduse in enduses_cols] group_by = list(params.group_by) if self._bsq.timestamp_column_name not in group_by and params.collapse_ts: logger.info("Aggregation done accross timestamps. Result no longer a timeseries.") # The aggregation is done across time so we should correct sample_count and units_count rows_per_building = self._bsq._get_rows_per_building() grouping_metrics_selection = [(safunc.sum(1) / rows_per_building).label( "sample_count"), safunc.sum(total_weight / rows_per_building).label("units_count")] elif self._bsq.timestamp_column_name not in group_by: group_by.append(self._bsq.timestamp_column_name) grouping_metrics_selection = [safunc.sum(1).label( "sample_count"), safunc.sum(total_weight).label("units_count")] elif params.collapse_ts: raise ValueError("collapse_ts is true, but there is timestamp column in group_by.") else: grouping_metrics_selection = [safunc.sum(1).label( "sample_count"), safunc.sum(total_weight).label("units_count")] if (colname := self._bsq.timestamp_column_name) in group_by and \ params.timestamp_grouping_func: # sample_count = count(distinct(building_id)) # units_count = count(distinct(buuilding_id)) * sum(total_weight) / sum(1) grouping_metrics_selection = [safunc.count(safunc.distinct(self._bsq.ts_bldgid_column)). label("sample_count"), (safunc.count(safunc.distinct(self._bsq.ts_bldgid_column)) * safunc.sum(total_weight) / safunc.sum(1)).label("units_count"), (safunc.sum(1) / safunc.count(safunc.distinct(self._bsq.ts_bldgid_column))). label("rows_per_sample"), ] indx = group_by.index(colname) sim_info = self._bsq._get_simulation_info() if sim_info.offset > 0: # If timestamps are not period begining we should make them so for timestamp_grouping_func aggregation. new_col = sa.func.date_trunc(params.timestamp_grouping_func, sa.func.date_add(sim_info.unit, -sim_info.offset, self._bsq.timestamp_column)).label(colname) else: new_col = sa.func.date_trunc(params.timestamp_grouping_func, self._bsq.timestamp_column).label(colname) group_by[indx] = new_col group_by_selection = self._bsq._process_groupby_cols(group_by, annual_only=False) query = sa.select(group_by_selection + grouping_metrics_selection + enduse_selection) query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.ts_bldgid_column) if params.join_list: query = self._bsq._add_join(query, params.join_list) group_by_names = [g.name for g in group_by_selection] upgrade_in_restrict = any(entry[0] == 'upgrade' for entry in params.restrict) if self._bsq.up_table is not None and not upgrade_in_restrict and 'upgrade' not in group_by_names: logger.info(f"Restricting query to Upgrade {upgrade_id}.") params.restrict = list(params.restrict) + [(self._bsq._ts_upgrade_col, [upgrade_id])] query = self._bsq._add_restrict(query, params.restrict) query = self._bsq._add_group_by(query, group_by_selection) query = self._bsq._add_order_by(query, group_by_selection if params.sort else []) query = query.limit(params.limit) if params.limit else query if params.get_query_only: return self._bsq._compile(query) return self._bsq.execute(query) @validate_arguments(config=dict(smart_union=True)) def get_building_average_kws_at(self, *, at_hour: Union[list[float], float], at_days: list[float], enduses: list[str], get_query_only: bool = False): """ Aggregates the timeseries result on select enduses, for the given days and hours. If all of the hour(s) fall exactly on the simulation timestamps, the aggregation is done by averaging the kW at those time stamps. If any of the hour(s) fall in between timestamps, then the following process is followed: i. The average kWs is calculated for timestamps specified by the hour, or just after it. Call it upper_kw ii. The average kWs is calculated for timestamps specified by the hour, or just before it. Call it lower_kw iii. Return the interpolation between upper_kw and lower_kw based on the average location of the hour(s) between the upper and lower timestamps. Check the argument description below to learn about additional features and options. Args: at_hour: the hour(s) at which the average kWs of buildings need to be calculated at. It can either be a single number if the hour is same for all days, or a list of numbers if the kW needs to be calculated for different hours for different days. at_days: The list of days (of year) for which the average kW is to be calculated for. enduses: The list of enduses for which to calculate the average kWs get_query_only: Skips submitting the query to Athena and just returns the query strings. Useful for batch submitting multiple queries or debugging. Returns: If get_query_only is True, returns two queries that gets the KW at two timestamps that are to immediate left and right of the the supplied hour. If get_query_only is False, returns the average KW of each building at the given hour(s) across the supplied days. """ if isinstance(at_hour, list): if len(at_hour) != len(at_days) or not at_hour: raise ValueError("The length of at_hour list should be the same as length of at_days list and" " not be empty") elif isinstance(at_hour, (float, int)): at_hour = [at_hour] * len(at_days) else: raise ValueError("At hour should be a list or a number") enduse_cols = self._bsq._get_enduse_cols(enduses, table='timeseries') total_weight = self._bsq._get_weight([]) sim_info = self._bsq._get_simulation_info() sim_year, sim_interval_seconds = sim_info.year, sim_info.interval kw_factor = 3600.0 / sim_interval_seconds enduse_selection = [safunc.avg(enduse * total_weight * kw_factor).label(self._bsq._simple_label(enduse.name)) for enduse in enduse_cols] grouping_metrics_selection = [safunc.sum(1).label("sample_count"), safunc.sum(total_weight).label("units_count")] def get_upper_timestamps(day, hour): new_dt = datetime.datetime(year=sim_year, month=1, day=1) if round(hour * 3600 % sim_interval_seconds, 2) == 0: # if the hour falls exactly on the simulation timestamp, use the same timestamp # for both lower and upper add = 0 else: add = 1 upper_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds * (int(hour * 3600 / sim_interval_seconds) + add)) if upper_dt.year > sim_year: upper_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds * (int(hour * 3600 / sim_interval_seconds))) return upper_dt def get_lower_timestamps(day, hour): new_dt = datetime.datetime(year=sim_year, month=1, day=1) lower_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds * int(hour * 3600 / sim_interval_seconds)) return lower_dt # check if the supplied hours fall exactly on the simulation timestamps exact_times = np.all([round(h * 3600 % sim_interval_seconds, 2) == 0 for h in at_hour]) lower_timestamps = [get_lower_timestamps(d - 1, h) for d, h in zip(at_days, at_hour)] upper_timestamps = [get_upper_timestamps(d - 1, h) for d, h in zip(at_days, at_hour)] query = sa.select([self._bsq.ts_bldgid_column] + grouping_metrics_selection + enduse_selection) query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.ts_bldgid_column) query = self._bsq._add_group_by(query, [self._bsq.ts_bldgid_column]) query = self._bsq._add_order_by(query, [self._bsq.ts_bldgid_column]) lower_val_query = self._bsq._add_restrict(query, [(self._bsq.timestamp_column_name, lower_timestamps)]) upper_val_query = self._bsq._add_restrict(query, [(self._bsq.timestamp_column_name, upper_timestamps)]) if exact_times: # only one query is sufficient if the hours fall in exact timestamps queries = [lower_val_query] else: queries = [lower_val_query, upper_val_query] query_strs = [self._bsq._compile(q) for q in queries] if get_query_only: return query_strs batch_id = self._bsq.submit_batch_query(query_strs) if exact_times: vals, = self._bsq.get_batch_query_result(batch_id, combine=False) return vals else: lower_vals, upper_vals = self._bsq.get_batch_query_result(batch_id, combine=False) avg_upper_weight = np.mean([min_of_hour / sim_interval_seconds for hour in at_hour if (min_of_hour := hour * 3600 % sim_interval_seconds)]) avg_lower_weight = 1 - avg_upper_weight # modify the lower vals to make it weighted average of upper and lower vals lower_vals[enduses] = lower_vals[enduses] * avg_lower_weight + upper_vals[enduses] * avg_upper_weight return lower_vals
Methods
def aggregate_annual(self, *, params: AnnualQuery)
-
Expand source code
@gather_params(AnnualQuery) def aggregate_annual(self, *, params: AnnualQuery): join_list = list(params.join_list) if params.join_list else [] weights = list(params.weights) if params.weights else [] restrict = list(params.restrict) if params.restrict else [] [self._bsq._get_table(jl[0]) for jl in join_list] # ingress all tables in join list if params.upgrade_id in {None, 0, '0'}: enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='baseline') upgrade_id = None else: upgrade_id = self._bsq._validate_upgrade(params.upgrade_id) enduse_cols = self._bsq._get_enduse_cols(params.enduses, table='upgrade') total_weight = self._bsq._get_weight(weights) enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name)) for enduse in enduse_cols] if params.get_quartiles: enduse_selection += [sa.func.approx_percentile(enduse, [0, 0.02, 0.25, 0.5, 0.75, 0.98, 1]).label( f"{self._bsq._simple_label(enduse.name)}__quartiles") for enduse in enduse_cols] if params.get_nonzero_count: enduse_selection += [safunc.sum(sa.case((safunc.coalesce(enduse, 0) != 0, 1), else_=0) * total_weight).label(f"{self._bsq._simple_label(enduse.name)}__nonzero_units_count") for enduse in enduse_cols] grouping_metrics_selction = [safunc.sum(1).label("sample_count"), safunc.sum(total_weight).label("units_count")] if not params.group_by: query = sa.select(grouping_metrics_selction + enduse_selection) group_by_selection = [] else: group_by_selection = self._bsq._process_groupby_cols(params.group_by, annual_only=True) query = sa.select(group_by_selection + grouping_metrics_selction + enduse_selection) # jj = self.bs_table.join(self.ts_table, self.ts_table.c['building_id']==self.bs_table.c['building_id']) # self._compile(query.select_from(jj)) if upgrade_id not in [None, 0, '0']: if self._bsq.up_table is None: raise ValueError("The run doesn't contain upgrades") tbljoin = self._bsq.bs_table.join( self._bsq.up_table, sa.and_(self._bsq.bs_table.c[self._bsq.building_id_column_name] == self._bsq.up_table.c[self._bsq.building_id_column_name], self._bsq.up_table.c["upgrade"] == str(upgrade_id), self._bsq._up_successful_condition)) query = query.select_from(tbljoin) restrict = [(self._bsq._bs_completed_status_col, [self._bsq.db_schema.completion_values.success])] + restrict query = self._bsq._add_join(query, join_list) query = self._bsq._add_restrict(query, restrict) query = self._bsq._add_group_by(query, group_by_selection) query = self._bsq._add_order_by(query, group_by_selection if params.sort else []) if params.get_query_only: return self._bsq._compile(query) return self._bsq.execute(query)
def aggregate_timeseries(self, params: TSQuery)
-
Expand source code
@gather_params(TSQuery) def aggregate_timeseries(self, params: TSQuery): if self._bsq.ts_table is None: raise ValueError("Not timeseries table available") upgrade_id = self._bsq._validate_upgrade(params.upgrade_id) if params.timestamp_grouping_func and \ params.timestamp_grouping_func not in ['hour', 'day', 'month']: raise ValueError("timestamp_grouping_func must be one of ['hour', 'day', 'month']") if params.split_enduses: return self._aggregate_timeseries_light(params) [self._bsq._get_table(jl[0]) for jl in params.join_list] # ingress all tables in join list enduses_cols = self._bsq._get_enduse_cols(params.enduses, table='timeseries') total_weight = self._bsq._get_weight(params.weights) enduse_selection = [safunc.sum(enduse * total_weight).label(self._bsq._simple_label(enduse.name)) for enduse in enduses_cols] group_by = list(params.group_by) if self._bsq.timestamp_column_name not in group_by and params.collapse_ts: logger.info("Aggregation done accross timestamps. Result no longer a timeseries.") # The aggregation is done across time so we should correct sample_count and units_count rows_per_building = self._bsq._get_rows_per_building() grouping_metrics_selection = [(safunc.sum(1) / rows_per_building).label( "sample_count"), safunc.sum(total_weight / rows_per_building).label("units_count")] elif self._bsq.timestamp_column_name not in group_by: group_by.append(self._bsq.timestamp_column_name) grouping_metrics_selection = [safunc.sum(1).label( "sample_count"), safunc.sum(total_weight).label("units_count")] elif params.collapse_ts: raise ValueError("collapse_ts is true, but there is timestamp column in group_by.") else: grouping_metrics_selection = [safunc.sum(1).label( "sample_count"), safunc.sum(total_weight).label("units_count")] if (colname := self._bsq.timestamp_column_name) in group_by and \ params.timestamp_grouping_func: # sample_count = count(distinct(building_id)) # units_count = count(distinct(buuilding_id)) * sum(total_weight) / sum(1) grouping_metrics_selection = [safunc.count(safunc.distinct(self._bsq.ts_bldgid_column)). label("sample_count"), (safunc.count(safunc.distinct(self._bsq.ts_bldgid_column)) * safunc.sum(total_weight) / safunc.sum(1)).label("units_count"), (safunc.sum(1) / safunc.count(safunc.distinct(self._bsq.ts_bldgid_column))). label("rows_per_sample"), ] indx = group_by.index(colname) sim_info = self._bsq._get_simulation_info() if sim_info.offset > 0: # If timestamps are not period begining we should make them so for timestamp_grouping_func aggregation. new_col = sa.func.date_trunc(params.timestamp_grouping_func, sa.func.date_add(sim_info.unit, -sim_info.offset, self._bsq.timestamp_column)).label(colname) else: new_col = sa.func.date_trunc(params.timestamp_grouping_func, self._bsq.timestamp_column).label(colname) group_by[indx] = new_col group_by_selection = self._bsq._process_groupby_cols(group_by, annual_only=False) query = sa.select(group_by_selection + grouping_metrics_selection + enduse_selection) query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.ts_bldgid_column) if params.join_list: query = self._bsq._add_join(query, params.join_list) group_by_names = [g.name for g in group_by_selection] upgrade_in_restrict = any(entry[0] == 'upgrade' for entry in params.restrict) if self._bsq.up_table is not None and not upgrade_in_restrict and 'upgrade' not in group_by_names: logger.info(f"Restricting query to Upgrade {upgrade_id}.") params.restrict = list(params.restrict) + [(self._bsq._ts_upgrade_col, [upgrade_id])] query = self._bsq._add_restrict(query, params.restrict) query = self._bsq._add_group_by(query, group_by_selection) query = self._bsq._add_order_by(query, group_by_selection if params.sort else []) query = query.limit(params.limit) if params.limit else query if params.get_query_only: return self._bsq._compile(query) return self._bsq.execute(query)
def get_building_average_kws_at(self, *, at_hour: Union[list[float], float], at_days: list[float], enduses: list[str], get_query_only: bool = False)
-
Aggregates the timeseries result on select enduses, for the given days and hours. If all of the hour(s) fall exactly on the simulation timestamps, the aggregation is done by averaging the kW at those time stamps. If any of the hour(s) fall in between timestamps, then the following process is followed: i. The average kWs is calculated for timestamps specified by the hour, or just after it. Call it upper_kw ii. The average kWs is calculated for timestamps specified by the hour, or just before it. Call it lower_kw iii. Return the interpolation between upper_kw and lower_kw based on the average location of the hour(s) between the upper and lower timestamps.
Check the argument description below to learn about additional features and options.
Args
at_hour
- the hour(s) at which the average kWs of buildings need to be calculated at. It can either be a single number if the hour is same for all days, or a list of numbers if the kW needs to be calculated for different hours for different days.
at_days
- The list of days (of year) for which the average kW is to be calculated for.
enduses
- The list of enduses for which to calculate the average kWs
get_query_only
- Skips submitting the query to Athena and just returns the query strings. Useful for batch submitting multiple queries or debugging.
Returns
If get_query_only is True, returns two queries that gets the KW at two timestamps that are to immediate left and right of the the supplied hour. If get_query_only is False, returns the average KW of each building at the given hour(s) across the supplied days.
Expand source code
@validate_arguments(config=dict(smart_union=True)) def get_building_average_kws_at(self, *, at_hour: Union[list[float], float], at_days: list[float], enduses: list[str], get_query_only: bool = False): """ Aggregates the timeseries result on select enduses, for the given days and hours. If all of the hour(s) fall exactly on the simulation timestamps, the aggregation is done by averaging the kW at those time stamps. If any of the hour(s) fall in between timestamps, then the following process is followed: i. The average kWs is calculated for timestamps specified by the hour, or just after it. Call it upper_kw ii. The average kWs is calculated for timestamps specified by the hour, or just before it. Call it lower_kw iii. Return the interpolation between upper_kw and lower_kw based on the average location of the hour(s) between the upper and lower timestamps. Check the argument description below to learn about additional features and options. Args: at_hour: the hour(s) at which the average kWs of buildings need to be calculated at. It can either be a single number if the hour is same for all days, or a list of numbers if the kW needs to be calculated for different hours for different days. at_days: The list of days (of year) for which the average kW is to be calculated for. enduses: The list of enduses for which to calculate the average kWs get_query_only: Skips submitting the query to Athena and just returns the query strings. Useful for batch submitting multiple queries or debugging. Returns: If get_query_only is True, returns two queries that gets the KW at two timestamps that are to immediate left and right of the the supplied hour. If get_query_only is False, returns the average KW of each building at the given hour(s) across the supplied days. """ if isinstance(at_hour, list): if len(at_hour) != len(at_days) or not at_hour: raise ValueError("The length of at_hour list should be the same as length of at_days list and" " not be empty") elif isinstance(at_hour, (float, int)): at_hour = [at_hour] * len(at_days) else: raise ValueError("At hour should be a list or a number") enduse_cols = self._bsq._get_enduse_cols(enduses, table='timeseries') total_weight = self._bsq._get_weight([]) sim_info = self._bsq._get_simulation_info() sim_year, sim_interval_seconds = sim_info.year, sim_info.interval kw_factor = 3600.0 / sim_interval_seconds enduse_selection = [safunc.avg(enduse * total_weight * kw_factor).label(self._bsq._simple_label(enduse.name)) for enduse in enduse_cols] grouping_metrics_selection = [safunc.sum(1).label("sample_count"), safunc.sum(total_weight).label("units_count")] def get_upper_timestamps(day, hour): new_dt = datetime.datetime(year=sim_year, month=1, day=1) if round(hour * 3600 % sim_interval_seconds, 2) == 0: # if the hour falls exactly on the simulation timestamp, use the same timestamp # for both lower and upper add = 0 else: add = 1 upper_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds * (int(hour * 3600 / sim_interval_seconds) + add)) if upper_dt.year > sim_year: upper_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds * (int(hour * 3600 / sim_interval_seconds))) return upper_dt def get_lower_timestamps(day, hour): new_dt = datetime.datetime(year=sim_year, month=1, day=1) lower_dt = new_dt + datetime.timedelta(days=day, seconds=sim_interval_seconds * int(hour * 3600 / sim_interval_seconds)) return lower_dt # check if the supplied hours fall exactly on the simulation timestamps exact_times = np.all([round(h * 3600 % sim_interval_seconds, 2) == 0 for h in at_hour]) lower_timestamps = [get_lower_timestamps(d - 1, h) for d, h in zip(at_days, at_hour)] upper_timestamps = [get_upper_timestamps(d - 1, h) for d, h in zip(at_days, at_hour)] query = sa.select([self._bsq.ts_bldgid_column] + grouping_metrics_selection + enduse_selection) query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.ts_bldgid_column) query = self._bsq._add_group_by(query, [self._bsq.ts_bldgid_column]) query = self._bsq._add_order_by(query, [self._bsq.ts_bldgid_column]) lower_val_query = self._bsq._add_restrict(query, [(self._bsq.timestamp_column_name, lower_timestamps)]) upper_val_query = self._bsq._add_restrict(query, [(self._bsq.timestamp_column_name, upper_timestamps)]) if exact_times: # only one query is sufficient if the hours fall in exact timestamps queries = [lower_val_query] else: queries = [lower_val_query, upper_val_query] query_strs = [self._bsq._compile(q) for q in queries] if get_query_only: return query_strs batch_id = self._bsq.submit_batch_query(query_strs) if exact_times: vals, = self._bsq.get_batch_query_result(batch_id, combine=False) return vals else: lower_vals, upper_vals = self._bsq.get_batch_query_result(batch_id, combine=False) avg_upper_weight = np.mean([min_of_hour / sim_interval_seconds for hour in at_hour if (min_of_hour := hour * 3600 % sim_interval_seconds)]) avg_lower_weight = 1 - avg_upper_weight # modify the lower vals to make it weighted average of upper and lower vals lower_vals[enduses] = lower_vals[enduses] * avg_lower_weight + upper_vals[enduses] * avg_upper_weight return lower_vals