Module buildstock_query.tools.characteristics_visualizer.viz_data
Expand source code
from buildstock_query import BuildStockQuery, KWH2MBTU
from pydantic import validate_arguments
import polars as pl
from buildstock_query.tools.upgrades_visualizer.plot_utils import PlotParams
from typing import Union
num2month = {1: "January", 2: "February", 3: "March", 4: "April",
5: "May", 6: "June", 7: "July", 8: "August",
9: "September", 10: "October", 11: "November", 12: "December"}
fuels_types = ['electricity', 'natural_gas', 'propane', 'fuel_oil', 'coal', 'wood_cord', 'wood_pellets']
class VizData:
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def __init__(self, yaml_path: str, opt_sat_path: str,
db_name: str,
run: Union[str, tuple[str, str]],
workgroup: str = 'largeee',
buildstock_type: str = 'resstock',
skip_init: bool = False):
if isinstance(run, tuple):
# Allows for separate baseline and upgrade runs
# In this case, run[0] is the baseline run and run[1] is the upgrade run
self.baseline_run = BuildStockQuery(workgroup=workgroup,
db_name=db_name,
buildstock_type=buildstock_type,
table_name=run[0],
skip_reports=skip_init)
baseline_table_name = run[0] + "_baseline"
upgrade_table_name = run[1] + "_upgrades"
ts_table_name = run[1] + "_timeseries"
table = (baseline_table_name, ts_table_name, upgrade_table_name)
else:
# If only one run is specified, it is assumed to contain both baseline and upgrade data
self.baseline_run = None
table = run
self.main_run = BuildStockQuery(workgroup=workgroup,
db_name=db_name,
buildstock_type=buildstock_type,
table_name=table,
skip_reports=skip_init)
self.yaml_path = yaml_path
self.opt_sat_path = opt_sat_path
if not skip_init:
self.initialize()
def initialize(self):
self.ua = self.main_run.get_upgrades_analyzer(self.yaml_path, self.opt_sat_path)
self.report = pl.from_pandas(self.main_run.report.get_success_report(), include_index=True)
self.available_upgrades = list(sorted(set(self.report["upgrade"].unique()) - {0}))
self.upgrade2name = {indx+1: f"Upgrade {indx+1}: {upgrade['upgrade_name']}" for indx,
upgrade in enumerate(self.ua.get_cfg().get('upgrades', []))}
self.upgrade2name[0] = "Upgrade 0: Baseline"
self.upgrade2shortname = {indx+1: f"Upgrade {indx+1}" for indx,
upgrade in enumerate(self.ua.get_cfg().get('upgrades', []))}
self.chng2bldg = self.get_change2bldgs()
self.init_annual_results()
self.init_monthly_results(self.metadata_df)
self.all_upgrade_plotting_df = None
def run_obj(self, upgrade: int) -> BuildStockQuery:
if upgrade == 0 and self.baseline_run is not None:
return self.baseline_run
return self.main_run
def get_change2bldgs(self):
change_types = ["any", "no-chng", "bad-chng", "ok-chng", "true-bad-chng", "true-ok-chng"]
chng2bldg = {}
for chng in change_types:
for upgrade in self.available_upgrades:
print(f"Getting buildings for {upgrade} and {chng}")
chng2bldg[(upgrade, chng)] = self.main_run.report.get_buildings_by_change(upgrade_id=int(upgrade),
change_type=chng)
return chng2bldg
def _get_results_csv_clean(self, upgrade: int):
if upgrade == 0:
res_df = pl.read_parquet(self.run_obj(upgrade)._download_results_csv())
else:
res_df = pl.read_parquet(self.run_obj(upgrade)._download_upgrades_csv(upgrade_id=upgrade))
res_df = res_df.filter(pl.col('completed_status') == 'success')
res_df = res_df.drop([col for col in res_df.columns if
"applicable" in col
or "output_format" in col])
res_df = res_df.rename({'upgrade_costs.upgrade_cost_usd': 'upgrade_cost_total_usd'})
res_df = res_df.rename({x: x.split('.')[1] for x in res_df.columns if '.' in x})
res_df = res_df.with_columns(upgrade=pl.lit(upgrade))
res_df = res_df.with_columns(month=pl.lit('All'))
self.run_obj(upgrade).save_cache()
return res_df
def _get_metadata_df(self):
bs_res_df = pl.read_parquet(self.run_obj(0)._download_results_csv())
metadata_cols = [c for c in bs_res_df.columns if c.startswith(self.char_prefix)]
metadata_df = bs_res_df.select(['building_id'] + metadata_cols)
metadata_df = metadata_df.rename({x: x.split('.')[1] for x in metadata_df.columns if '.' in x})
return metadata_df
def init_annual_results(self):
self.bs_res_df = self._get_results_csv_clean(0)
self.metadata_df = self._get_metadata_df()
self.sample_weight = self.metadata_df['sample_weight'][0]
self.upgrade2res = {0: self.bs_res_df}
for upgrade in self.available_upgrades:
print(f"Getting up_csv for {upgrade}")
up_csv = self._get_results_csv_clean(upgrade)
up_csv = up_csv.join(self.metadata_df, on='building_id')
self.upgrade2res[upgrade] = up_csv
def _get_ts_enduse_cols(self, upgrade):
rub_obj = self.run_obj(upgrade)
assert rub_obj.ts_table is not None, "No timeseries table found"
all_cols = [str(col.name) for col in rub_obj.get_cols(table=rub_obj.ts_table)]
enduse_cols = filter(lambda x: x.endswith(('_kbtu', '_kwh', 'lb')), all_cols)
return list(enduse_cols)
def init_monthly_results(self, metadata_df):
self.upgrade2res_monthly: dict[int, pl.DataFrame] = {}
for upgrade in [0] + self.available_upgrades:
ts_cols = self._get_ts_enduse_cols(upgrade)
print(f"Getting monthly results for {upgrade} for {ts_cols}")
run_obj = self.run_obj(upgrade)
query_string = run_obj.agg.aggregate_timeseries(enduses=ts_cols,
group_by=[run_obj.bs_bldgid_column],
upgrade_id=upgrade,
timestamp_grouping_func='month',
get_query_only=True)
print(query_string)
monthly_vals = run_obj.execute(query_string)
run_obj.save_cache()
monthly_df = pl.from_pandas(monthly_vals, include_index=True)
monthly_df = monthly_df.with_columns(pl.col('time').dt.month().alias("month"))
monthly_df = monthly_df.with_columns(pl.col('month').map_dict(num2month).alias("month"))
modified_cols = []
for col in ts_cols:
# scale values down to per building and convert to m_btu to match with annual results
if col.endswith('_kwh'):
modified_cols.append((KWH2MBTU * pl.col(col) / pl.col("units_count"))
.alias(col.replace("kwh", "m_btu").replace("__", "_")))
if col.endswith("_kbtu"):
modified_cols.append((0.001 * pl.col(col) / pl.col("units_count"))
.alias(col.replace("kbtu", "m_btu").replace("__", "_")))
else:
modified_cols.append((pl.col(col) / pl.col("units_count"))
.alias(col.replace("__", "_")))
monthly_df = monthly_df.select(['building_id', 'month'] + modified_cols
+ [pl.lit(upgrade).alias("upgrade")])
monthly_df = monthly_df.join(metadata_df, on='building_id')
self.upgrade2res_monthly[upgrade] = monthly_df
def get_values(self,
upgrade: int,
params: PlotParams,
) -> pl.DataFrame:
df = self.upgrade2res[upgrade] if params.resolution == 'annual' else self.upgrade2res_monthly[upgrade]
if params.filter_bldgs:
df = df.filter(pl.col('building_id').is_in(set(params.filter_bldgs)))
missing_cols = (pl.lit(0).alias(c) for c in params.enduses if c not in df.columns)
df = df.with_columns(missing_cols) # add missing cols as zero
value_cols = [pl.sum_horizontal([pl.col(c).fill_null(0) for c in params.enduses]).alias("value")]
other_cols = ['building_id'] + params.group_by
if 'month' not in params.group_by:
other_cols += ['month']
return df.select(other_cols + value_cols)
def get_plotting_df(self, upgrade: int,
params: PlotParams,):
baseline_df = self.get_values(upgrade=0, params=params)
baseline_df = baseline_df.select("building_id", "month", pl.col("value").alias("baseline_value"))
up_df = self.get_values(upgrade=upgrade, params=params)
if params.change_type:
chng_upgrade = int(params.sync_upgrade) if params.sync_upgrade else int(upgrade) if upgrade else 0
if chng_upgrade and chng_upgrade > 0:
change_bldg_list = self.chng2bldg[(chng_upgrade, params.change_type)]
else:
change_bldg_list = []
change_bldg_list = set(change_bldg_list).intersection(up_df['building_id'].to_list())
up_df = up_df.filter(pl.col("building_id").is_in(change_bldg_list))
up_df = up_df.join(baseline_df, on=('building_id', 'month'), how='left')
if params.savings_type == "Savings":
up_df = up_df.with_columns((pl.col("baseline_value") - pl.col("value")).alias("value"))
up_df = up_df
elif params.savings_type == "Percent Savings":
up_df = up_df.with_columns((100 * (1 - pl.col("value") / pl.col("baseline_value"))).alias("value"))
# handle case when baseline is 0
up_df = up_df.with_columns(
pl.when(pl.col("baseline_value") == 0)
.then(-100)
.otherwise(pl.col("value"))
.alias("value"))
up_df = up_df.with_columns(
pl.when((pl.col("baseline_value") == 0) & (pl.col("value").is_between(-1e-6, 1e-6)))
.then(0)
.otherwise(pl.col("value"))
.alias("value")
)
return up_df
def get_all_cols(self, resolution: str) -> list[str]:
if resolution == 'annual':
return self.bs_res_df.columns
else:
return self.upgrade2res_monthly[0].columns
def get_all_end_use_cols(self, resolution: str) -> list[str]:
all_cols = self.get_all_cols(resolution=resolution)
all_end_use_cols = filter(lambda col: col.startswith(("end_use_", "energy_use_", "fuel_use_")), all_cols)
return list(all_end_use_cols)
def get_emissions_cols(self, resolution: str) -> list[str]:
all_cols = self.get_all_cols(resolution=resolution)
all_emissions_cols = filter(lambda c: c.startswith("emissions_"), all_cols)
return list(all_emissions_cols)
def get_cleaned_up_end_use_cols(self, resolution: str, fuel) -> list[str]:
cols = []
all_end_use_cols = self.get_all_end_use_cols(resolution=resolution)
sep = "_"
for c in all_end_use_cols:
if fuel in c or fuel == 'All':
c = c.removeprefix(f"end_use{sep}{fuel}{sep}")
c = c.removeprefix(f"fuel_use{sep}{fuel}{sep}")
if fuel == 'All':
for f in sorted(fuels_types):
c = c.removeprefix(f"end_use{sep}{f}{sep}")
c = c.removeprefix(f"fuel_use{sep}{f}{sep}")
cols.append(c)
no_dup_cols = {c: None for c in cols}
return list(no_dup_cols.keys())
def get_end_use_db_cols(self, resolution, fuel, end_use):
all_enduses = self.get_all_end_use_cols(resolution=resolution)
if not end_use:
return all_enduses[0]
valid_cols = []
sep = "_"
prefix = "fuel_use" if end_use.startswith("total") else "end_use"
if fuel == 'All':
valid_cols.extend(f"{prefix}{sep}{f}{sep}{end_use}" for f in fuels_types
if f"{prefix}{sep}{f}{sep}{end_use}" in all_enduses)
else:
valid_cols.append(f"{prefix}{sep}{fuel}{sep}{end_use}")
return valid_cols
def get_plotting_df_all_upgrades(self,
params: PlotParams,
) -> pl.DataFrame:
df_list = []
for upgrade in [0] + self.available_upgrades:
df = self.get_plotting_df(upgrade=upgrade, params=params)
df = df.with_columns(pl.lit(upgrade).alias("upgrade"))
df_list.append(df)
return pl.concat(df_list).fill_null(0)
Classes
class VizData (yaml_path: str, opt_sat_path: str, db_name: str, run: Union[str, tuple[str, str]], workgroup: str = 'largeee', buildstock_type: str = 'resstock', skip_init: bool = False)
-
Expand source code
class VizData: @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def __init__(self, yaml_path: str, opt_sat_path: str, db_name: str, run: Union[str, tuple[str, str]], workgroup: str = 'largeee', buildstock_type: str = 'resstock', skip_init: bool = False): if isinstance(run, tuple): # Allows for separate baseline and upgrade runs # In this case, run[0] is the baseline run and run[1] is the upgrade run self.baseline_run = BuildStockQuery(workgroup=workgroup, db_name=db_name, buildstock_type=buildstock_type, table_name=run[0], skip_reports=skip_init) baseline_table_name = run[0] + "_baseline" upgrade_table_name = run[1] + "_upgrades" ts_table_name = run[1] + "_timeseries" table = (baseline_table_name, ts_table_name, upgrade_table_name) else: # If only one run is specified, it is assumed to contain both baseline and upgrade data self.baseline_run = None table = run self.main_run = BuildStockQuery(workgroup=workgroup, db_name=db_name, buildstock_type=buildstock_type, table_name=table, skip_reports=skip_init) self.yaml_path = yaml_path self.opt_sat_path = opt_sat_path if not skip_init: self.initialize() def initialize(self): self.ua = self.main_run.get_upgrades_analyzer(self.yaml_path, self.opt_sat_path) self.report = pl.from_pandas(self.main_run.report.get_success_report(), include_index=True) self.available_upgrades = list(sorted(set(self.report["upgrade"].unique()) - {0})) self.upgrade2name = {indx+1: f"Upgrade {indx+1}: {upgrade['upgrade_name']}" for indx, upgrade in enumerate(self.ua.get_cfg().get('upgrades', []))} self.upgrade2name[0] = "Upgrade 0: Baseline" self.upgrade2shortname = {indx+1: f"Upgrade {indx+1}" for indx, upgrade in enumerate(self.ua.get_cfg().get('upgrades', []))} self.chng2bldg = self.get_change2bldgs() self.init_annual_results() self.init_monthly_results(self.metadata_df) self.all_upgrade_plotting_df = None def run_obj(self, upgrade: int) -> BuildStockQuery: if upgrade == 0 and self.baseline_run is not None: return self.baseline_run return self.main_run def get_change2bldgs(self): change_types = ["any", "no-chng", "bad-chng", "ok-chng", "true-bad-chng", "true-ok-chng"] chng2bldg = {} for chng in change_types: for upgrade in self.available_upgrades: print(f"Getting buildings for {upgrade} and {chng}") chng2bldg[(upgrade, chng)] = self.main_run.report.get_buildings_by_change(upgrade_id=int(upgrade), change_type=chng) return chng2bldg def _get_results_csv_clean(self, upgrade: int): if upgrade == 0: res_df = pl.read_parquet(self.run_obj(upgrade)._download_results_csv()) else: res_df = pl.read_parquet(self.run_obj(upgrade)._download_upgrades_csv(upgrade_id=upgrade)) res_df = res_df.filter(pl.col('completed_status') == 'success') res_df = res_df.drop([col for col in res_df.columns if "applicable" in col or "output_format" in col]) res_df = res_df.rename({'upgrade_costs.upgrade_cost_usd': 'upgrade_cost_total_usd'}) res_df = res_df.rename({x: x.split('.')[1] for x in res_df.columns if '.' in x}) res_df = res_df.with_columns(upgrade=pl.lit(upgrade)) res_df = res_df.with_columns(month=pl.lit('All')) self.run_obj(upgrade).save_cache() return res_df def _get_metadata_df(self): bs_res_df = pl.read_parquet(self.run_obj(0)._download_results_csv()) metadata_cols = [c for c in bs_res_df.columns if c.startswith(self.char_prefix)] metadata_df = bs_res_df.select(['building_id'] + metadata_cols) metadata_df = metadata_df.rename({x: x.split('.')[1] for x in metadata_df.columns if '.' in x}) return metadata_df def init_annual_results(self): self.bs_res_df = self._get_results_csv_clean(0) self.metadata_df = self._get_metadata_df() self.sample_weight = self.metadata_df['sample_weight'][0] self.upgrade2res = {0: self.bs_res_df} for upgrade in self.available_upgrades: print(f"Getting up_csv for {upgrade}") up_csv = self._get_results_csv_clean(upgrade) up_csv = up_csv.join(self.metadata_df, on='building_id') self.upgrade2res[upgrade] = up_csv def _get_ts_enduse_cols(self, upgrade): rub_obj = self.run_obj(upgrade) assert rub_obj.ts_table is not None, "No timeseries table found" all_cols = [str(col.name) for col in rub_obj.get_cols(table=rub_obj.ts_table)] enduse_cols = filter(lambda x: x.endswith(('_kbtu', '_kwh', 'lb')), all_cols) return list(enduse_cols) def init_monthly_results(self, metadata_df): self.upgrade2res_monthly: dict[int, pl.DataFrame] = {} for upgrade in [0] + self.available_upgrades: ts_cols = self._get_ts_enduse_cols(upgrade) print(f"Getting monthly results for {upgrade} for {ts_cols}") run_obj = self.run_obj(upgrade) query_string = run_obj.agg.aggregate_timeseries(enduses=ts_cols, group_by=[run_obj.bs_bldgid_column], upgrade_id=upgrade, timestamp_grouping_func='month', get_query_only=True) print(query_string) monthly_vals = run_obj.execute(query_string) run_obj.save_cache() monthly_df = pl.from_pandas(monthly_vals, include_index=True) monthly_df = monthly_df.with_columns(pl.col('time').dt.month().alias("month")) monthly_df = monthly_df.with_columns(pl.col('month').map_dict(num2month).alias("month")) modified_cols = [] for col in ts_cols: # scale values down to per building and convert to m_btu to match with annual results if col.endswith('_kwh'): modified_cols.append((KWH2MBTU * pl.col(col) / pl.col("units_count")) .alias(col.replace("kwh", "m_btu").replace("__", "_"))) if col.endswith("_kbtu"): modified_cols.append((0.001 * pl.col(col) / pl.col("units_count")) .alias(col.replace("kbtu", "m_btu").replace("__", "_"))) else: modified_cols.append((pl.col(col) / pl.col("units_count")) .alias(col.replace("__", "_"))) monthly_df = monthly_df.select(['building_id', 'month'] + modified_cols + [pl.lit(upgrade).alias("upgrade")]) monthly_df = monthly_df.join(metadata_df, on='building_id') self.upgrade2res_monthly[upgrade] = monthly_df def get_values(self, upgrade: int, params: PlotParams, ) -> pl.DataFrame: df = self.upgrade2res[upgrade] if params.resolution == 'annual' else self.upgrade2res_monthly[upgrade] if params.filter_bldgs: df = df.filter(pl.col('building_id').is_in(set(params.filter_bldgs))) missing_cols = (pl.lit(0).alias(c) for c in params.enduses if c not in df.columns) df = df.with_columns(missing_cols) # add missing cols as zero value_cols = [pl.sum_horizontal([pl.col(c).fill_null(0) for c in params.enduses]).alias("value")] other_cols = ['building_id'] + params.group_by if 'month' not in params.group_by: other_cols += ['month'] return df.select(other_cols + value_cols) def get_plotting_df(self, upgrade: int, params: PlotParams,): baseline_df = self.get_values(upgrade=0, params=params) baseline_df = baseline_df.select("building_id", "month", pl.col("value").alias("baseline_value")) up_df = self.get_values(upgrade=upgrade, params=params) if params.change_type: chng_upgrade = int(params.sync_upgrade) if params.sync_upgrade else int(upgrade) if upgrade else 0 if chng_upgrade and chng_upgrade > 0: change_bldg_list = self.chng2bldg[(chng_upgrade, params.change_type)] else: change_bldg_list = [] change_bldg_list = set(change_bldg_list).intersection(up_df['building_id'].to_list()) up_df = up_df.filter(pl.col("building_id").is_in(change_bldg_list)) up_df = up_df.join(baseline_df, on=('building_id', 'month'), how='left') if params.savings_type == "Savings": up_df = up_df.with_columns((pl.col("baseline_value") - pl.col("value")).alias("value")) up_df = up_df elif params.savings_type == "Percent Savings": up_df = up_df.with_columns((100 * (1 - pl.col("value") / pl.col("baseline_value"))).alias("value")) # handle case when baseline is 0 up_df = up_df.with_columns( pl.when(pl.col("baseline_value") == 0) .then(-100) .otherwise(pl.col("value")) .alias("value")) up_df = up_df.with_columns( pl.when((pl.col("baseline_value") == 0) & (pl.col("value").is_between(-1e-6, 1e-6))) .then(0) .otherwise(pl.col("value")) .alias("value") ) return up_df def get_all_cols(self, resolution: str) -> list[str]: if resolution == 'annual': return self.bs_res_df.columns else: return self.upgrade2res_monthly[0].columns def get_all_end_use_cols(self, resolution: str) -> list[str]: all_cols = self.get_all_cols(resolution=resolution) all_end_use_cols = filter(lambda col: col.startswith(("end_use_", "energy_use_", "fuel_use_")), all_cols) return list(all_end_use_cols) def get_emissions_cols(self, resolution: str) -> list[str]: all_cols = self.get_all_cols(resolution=resolution) all_emissions_cols = filter(lambda c: c.startswith("emissions_"), all_cols) return list(all_emissions_cols) def get_cleaned_up_end_use_cols(self, resolution: str, fuel) -> list[str]: cols = [] all_end_use_cols = self.get_all_end_use_cols(resolution=resolution) sep = "_" for c in all_end_use_cols: if fuel in c or fuel == 'All': c = c.removeprefix(f"end_use{sep}{fuel}{sep}") c = c.removeprefix(f"fuel_use{sep}{fuel}{sep}") if fuel == 'All': for f in sorted(fuels_types): c = c.removeprefix(f"end_use{sep}{f}{sep}") c = c.removeprefix(f"fuel_use{sep}{f}{sep}") cols.append(c) no_dup_cols = {c: None for c in cols} return list(no_dup_cols.keys()) def get_end_use_db_cols(self, resolution, fuel, end_use): all_enduses = self.get_all_end_use_cols(resolution=resolution) if not end_use: return all_enduses[0] valid_cols = [] sep = "_" prefix = "fuel_use" if end_use.startswith("total") else "end_use" if fuel == 'All': valid_cols.extend(f"{prefix}{sep}{f}{sep}{end_use}" for f in fuels_types if f"{prefix}{sep}{f}{sep}{end_use}" in all_enduses) else: valid_cols.append(f"{prefix}{sep}{fuel}{sep}{end_use}") return valid_cols def get_plotting_df_all_upgrades(self, params: PlotParams, ) -> pl.DataFrame: df_list = [] for upgrade in [0] + self.available_upgrades: df = self.get_plotting_df(upgrade=upgrade, params=params) df = df.with_columns(pl.lit(upgrade).alias("upgrade")) df_list.append(df) return pl.concat(df_list).fill_null(0)
Methods
def get_all_cols(self, resolution: str) ‑> list[str]
-
Expand source code
def get_all_cols(self, resolution: str) -> list[str]: if resolution == 'annual': return self.bs_res_df.columns else: return self.upgrade2res_monthly[0].columns
def get_all_end_use_cols(self, resolution: str) ‑> list[str]
-
Expand source code
def get_all_end_use_cols(self, resolution: str) -> list[str]: all_cols = self.get_all_cols(resolution=resolution) all_end_use_cols = filter(lambda col: col.startswith(("end_use_", "energy_use_", "fuel_use_")), all_cols) return list(all_end_use_cols)
def get_change2bldgs(self)
-
Expand source code
def get_change2bldgs(self): change_types = ["any", "no-chng", "bad-chng", "ok-chng", "true-bad-chng", "true-ok-chng"] chng2bldg = {} for chng in change_types: for upgrade in self.available_upgrades: print(f"Getting buildings for {upgrade} and {chng}") chng2bldg[(upgrade, chng)] = self.main_run.report.get_buildings_by_change(upgrade_id=int(upgrade), change_type=chng) return chng2bldg
def get_cleaned_up_end_use_cols(self, resolution: str, fuel) ‑> list[str]
-
Expand source code
def get_cleaned_up_end_use_cols(self, resolution: str, fuel) -> list[str]: cols = [] all_end_use_cols = self.get_all_end_use_cols(resolution=resolution) sep = "_" for c in all_end_use_cols: if fuel in c or fuel == 'All': c = c.removeprefix(f"end_use{sep}{fuel}{sep}") c = c.removeprefix(f"fuel_use{sep}{fuel}{sep}") if fuel == 'All': for f in sorted(fuels_types): c = c.removeprefix(f"end_use{sep}{f}{sep}") c = c.removeprefix(f"fuel_use{sep}{f}{sep}") cols.append(c) no_dup_cols = {c: None for c in cols} return list(no_dup_cols.keys())
def get_emissions_cols(self, resolution: str) ‑> list[str]
-
Expand source code
def get_emissions_cols(self, resolution: str) -> list[str]: all_cols = self.get_all_cols(resolution=resolution) all_emissions_cols = filter(lambda c: c.startswith("emissions_"), all_cols) return list(all_emissions_cols)
def get_end_use_db_cols(self, resolution, fuel, end_use)
-
Expand source code
def get_end_use_db_cols(self, resolution, fuel, end_use): all_enduses = self.get_all_end_use_cols(resolution=resolution) if not end_use: return all_enduses[0] valid_cols = [] sep = "_" prefix = "fuel_use" if end_use.startswith("total") else "end_use" if fuel == 'All': valid_cols.extend(f"{prefix}{sep}{f}{sep}{end_use}" for f in fuels_types if f"{prefix}{sep}{f}{sep}{end_use}" in all_enduses) else: valid_cols.append(f"{prefix}{sep}{fuel}{sep}{end_use}") return valid_cols
def get_plotting_df(self, upgrade: int, params: PlotParams)
-
Expand source code
def get_plotting_df(self, upgrade: int, params: PlotParams,): baseline_df = self.get_values(upgrade=0, params=params) baseline_df = baseline_df.select("building_id", "month", pl.col("value").alias("baseline_value")) up_df = self.get_values(upgrade=upgrade, params=params) if params.change_type: chng_upgrade = int(params.sync_upgrade) if params.sync_upgrade else int(upgrade) if upgrade else 0 if chng_upgrade and chng_upgrade > 0: change_bldg_list = self.chng2bldg[(chng_upgrade, params.change_type)] else: change_bldg_list = [] change_bldg_list = set(change_bldg_list).intersection(up_df['building_id'].to_list()) up_df = up_df.filter(pl.col("building_id").is_in(change_bldg_list)) up_df = up_df.join(baseline_df, on=('building_id', 'month'), how='left') if params.savings_type == "Savings": up_df = up_df.with_columns((pl.col("baseline_value") - pl.col("value")).alias("value")) up_df = up_df elif params.savings_type == "Percent Savings": up_df = up_df.with_columns((100 * (1 - pl.col("value") / pl.col("baseline_value"))).alias("value")) # handle case when baseline is 0 up_df = up_df.with_columns( pl.when(pl.col("baseline_value") == 0) .then(-100) .otherwise(pl.col("value")) .alias("value")) up_df = up_df.with_columns( pl.when((pl.col("baseline_value") == 0) & (pl.col("value").is_between(-1e-6, 1e-6))) .then(0) .otherwise(pl.col("value")) .alias("value") ) return up_df
def get_plotting_df_all_upgrades(self, params: PlotParams) ‑> polars.dataframe.frame.DataFrame
-
Expand source code
def get_plotting_df_all_upgrades(self, params: PlotParams, ) -> pl.DataFrame: df_list = [] for upgrade in [0] + self.available_upgrades: df = self.get_plotting_df(upgrade=upgrade, params=params) df = df.with_columns(pl.lit(upgrade).alias("upgrade")) df_list.append(df) return pl.concat(df_list).fill_null(0)
def get_values(self, upgrade: int, params: PlotParams) ‑> polars.dataframe.frame.DataFrame
-
Expand source code
def get_values(self, upgrade: int, params: PlotParams, ) -> pl.DataFrame: df = self.upgrade2res[upgrade] if params.resolution == 'annual' else self.upgrade2res_monthly[upgrade] if params.filter_bldgs: df = df.filter(pl.col('building_id').is_in(set(params.filter_bldgs))) missing_cols = (pl.lit(0).alias(c) for c in params.enduses if c not in df.columns) df = df.with_columns(missing_cols) # add missing cols as zero value_cols = [pl.sum_horizontal([pl.col(c).fill_null(0) for c in params.enduses]).alias("value")] other_cols = ['building_id'] + params.group_by if 'month' not in params.group_by: other_cols += ['month'] return df.select(other_cols + value_cols)
def init_annual_results(self)
-
Expand source code
def init_annual_results(self): self.bs_res_df = self._get_results_csv_clean(0) self.metadata_df = self._get_metadata_df() self.sample_weight = self.metadata_df['sample_weight'][0] self.upgrade2res = {0: self.bs_res_df} for upgrade in self.available_upgrades: print(f"Getting up_csv for {upgrade}") up_csv = self._get_results_csv_clean(upgrade) up_csv = up_csv.join(self.metadata_df, on='building_id') self.upgrade2res[upgrade] = up_csv
def init_monthly_results(self, metadata_df)
-
Expand source code
def init_monthly_results(self, metadata_df): self.upgrade2res_monthly: dict[int, pl.DataFrame] = {} for upgrade in [0] + self.available_upgrades: ts_cols = self._get_ts_enduse_cols(upgrade) print(f"Getting monthly results for {upgrade} for {ts_cols}") run_obj = self.run_obj(upgrade) query_string = run_obj.agg.aggregate_timeseries(enduses=ts_cols, group_by=[run_obj.bs_bldgid_column], upgrade_id=upgrade, timestamp_grouping_func='month', get_query_only=True) print(query_string) monthly_vals = run_obj.execute(query_string) run_obj.save_cache() monthly_df = pl.from_pandas(monthly_vals, include_index=True) monthly_df = monthly_df.with_columns(pl.col('time').dt.month().alias("month")) monthly_df = monthly_df.with_columns(pl.col('month').map_dict(num2month).alias("month")) modified_cols = [] for col in ts_cols: # scale values down to per building and convert to m_btu to match with annual results if col.endswith('_kwh'): modified_cols.append((KWH2MBTU * pl.col(col) / pl.col("units_count")) .alias(col.replace("kwh", "m_btu").replace("__", "_"))) if col.endswith("_kbtu"): modified_cols.append((0.001 * pl.col(col) / pl.col("units_count")) .alias(col.replace("kbtu", "m_btu").replace("__", "_"))) else: modified_cols.append((pl.col(col) / pl.col("units_count")) .alias(col.replace("__", "_"))) monthly_df = monthly_df.select(['building_id', 'month'] + modified_cols + [pl.lit(upgrade).alias("upgrade")]) monthly_df = monthly_df.join(metadata_df, on='building_id') self.upgrade2res_monthly[upgrade] = monthly_df
def initialize(self)
-
Expand source code
def initialize(self): self.ua = self.main_run.get_upgrades_analyzer(self.yaml_path, self.opt_sat_path) self.report = pl.from_pandas(self.main_run.report.get_success_report(), include_index=True) self.available_upgrades = list(sorted(set(self.report["upgrade"].unique()) - {0})) self.upgrade2name = {indx+1: f"Upgrade {indx+1}: {upgrade['upgrade_name']}" for indx, upgrade in enumerate(self.ua.get_cfg().get('upgrades', []))} self.upgrade2name[0] = "Upgrade 0: Baseline" self.upgrade2shortname = {indx+1: f"Upgrade {indx+1}" for indx, upgrade in enumerate(self.ua.get_cfg().get('upgrades', []))} self.chng2bldg = self.get_change2bldgs() self.init_annual_results() self.init_monthly_results(self.metadata_df) self.all_upgrade_plotting_df = None
def run_obj(self, upgrade: int) ‑> BuildStockQuery
-
Expand source code
def run_obj(self, upgrade: int) -> BuildStockQuery: if upgrade == 0 and self.baseline_run is not None: return self.baseline_run return self.main_run