Module buildstock_query.tools
Expand source code
from .upgrades_analyzer import UpgradesAnalyzer
__all__ = ['UpgradesAnalyzer']
Sub-modules
buildstock_query.tools.characteristics_visualizer
buildstock_query.tools.logic_parser
buildstock_query.tools.upgrades_analyzer
buildstock_query.tools.upgrades_visualizer
Classes
class UpgradesAnalyzer (yaml_file: str, buildstock: Union[str, pandas.core.frame.DataFrame], opt_sat_file: str)
-
Analyze the apply logic for various upgrades in the project yaml file.
Initialize the analyzer instance.
Args
yaml_file
:str
- The path to the yaml file.
buildstock
:Union[str, pd.DataFrame]
- Either the buildstock dataframe, or path to the csv
opt_sat_file
:str
- The path to the option saturation file.
Expand source code
class UpgradesAnalyzer: """ Analyze the apply logic for various upgrades in the project yaml file. """ def __init__(self, yaml_file: str, buildstock: Union[str, pd.DataFrame], opt_sat_file: str) -> None: """ Initialize the analyzer instance. Args: yaml_file (str): The path to the yaml file. buildstock (Union[str, pd.DataFrame]): Either the buildstock dataframe, or path to the csv opt_sat_file (str): The path to the option saturation file. """ self.parser = LogicParser(opt_sat_file, yaml_file) self.yaml_file = yaml_file if isinstance(buildstock, str): self.buildstock_df_original = read_csv(buildstock, dtype=str) self.buildstock_df = self.buildstock_df_original.copy() self.buildstock_df.columns = [c.lower() for c in self.buildstock_df.columns] self.buildstock_df.rename(columns={"building": "building_id"}, inplace=True) self.buildstock_df.set_index("building_id", inplace=True) elif isinstance(buildstock, pd.DataFrame): self.buildstock_df_original = buildstock.copy() self.buildstock_df = buildstock.reset_index().rename(columns=str.lower) self.buildstock_df.rename(columns={"building": "building_id"}, inplace=True) if "building_id" in self.buildstock_df.columns: self.buildstock_df.set_index("building_id", inplace=True) self.buildstock_df = self.buildstock_df.astype(str) self.total_samples = len(self.buildstock_df) self._logic_cache: dict = {} def get_cfg(self) -> dict: """Get the buildstock configuration file as a dictionary object. Returns: dict: The buildstock configuration file. """ with open(self.yaml_file) as f: config = yaml.load(f, Loader=yaml.SafeLoader) return config @staticmethod def _get_eq_str(condition): para, option = UpgradesAnalyzer._get_para_option(condition) return f"`{para.lower()}`=='{option}'" @staticmethod def _get_para_option(condition): try: para, option = condition.split("|") except ValueError as e: raise ValueError(f"Condition {condition} is invalid") from e return para.lower(), option @staticmethod def get_mentioned_parameters(logic: Union[list, dict, str]) -> list: """ Returns the list of all parameters referenced in a logic block. Useful for debugging Args: logic ( Union[list, dict, str]): The apply logic Raises: ValueError: If the input logic is invalid Returns: List: The list of parameters """ if not logic: return [] if isinstance(logic, str): return [UpgradesAnalyzer._get_para_option(logic)[0]] elif isinstance(logic, list): all_params = [] for el in logic: all_params.extend(UpgradesAnalyzer.get_mentioned_parameters(el)) return list(dict.fromkeys(all_params)) # remove duplicates while maintainig order elif isinstance(logic, dict): return UpgradesAnalyzer.get_mentioned_parameters(list(logic.values())[0]) else: raise ValueError("Invalid logic type") def print_unique_characteristic(self, upgrade_num: int, name: str, base_bldg_list: list, compare_bldg_list: list): """Finds and prints what's unique among a list of buildings compared to baseline buildings. Useful for debugging why a certain set of buildings' energy consumption went up for an upgrade, for example. Args: upgrade_num (int): The upgrade for which the analysis is being done. name (str): Some name to identify the building set (only used for printing) base_bldg_list (list): The set of 'normal' buildings id to compare against. compare_bldg_list (list): The set of buildings whose unique characteristics is to be printed. """ cfg = self.get_cfg() if upgrade_num == 0: raise ValueError(f"Upgrades are 1-indexed. Got {upgrade_num}") try: upgrade_cfg = cfg["upgrades"][upgrade_num - 1] except KeyError as e: raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrades are 1-indexed, FYI.") from e parameter_list = [] for option_cfg in upgrade_cfg["options"]: parameter_list.append(UpgradesAnalyzer._get_para_option(option_cfg["option"])[0]) parameter_list.extend(UpgradesAnalyzer.get_mentioned_parameters(option_cfg.get("apply_logic"))) res_df = self.buildstock_df # remove duplicates (dict.fromkeys) and remove parameters not existing in buildstock_df parameter_list = [param for param in dict.fromkeys(parameter_list) if param in res_df.columns] compare_df = res_df.loc[compare_bldg_list] base_df = res_df.loc[base_bldg_list] print(f"Comparing {len(compare_df)} buildings with {len(base_df)} other buildings.") unique_vals_dict: dict[tuple[str, ...], set[tuple[str, ...]]] = {} for col in res_df.columns: no_change_set = set(compare_df[col].fillna("").unique()) other_set = set(base_df[col].fillna("").unique()) if only_in_no_change := no_change_set - other_set: print(f"Only {name} buildings have {col} in {sorted(only_in_no_change)}") unique_vals_dict[(col,)] = {(entry,) for entry in only_in_no_change} if not unique_vals_dict: print("No 1-column unique chracteristics found.") for combi_size in range(2, min(len(parameter_list) + 1, 5)): print(f"Checking {combi_size} column combinations out of {parameter_list}") found_uniq_chars = 0 for cols in combinations(parameter_list, combi_size): compare_tups = compare_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None) other_tups = base_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None) only_in_compare = set(compare_tups) - set(other_tups) # remove cases arisen out of uniqueness found earlier with smaller susbset of cols for sub_combi_size in range(1, len(cols)): for sub_cols in combinations(cols, sub_combi_size): if sub_cols in unique_vals_dict: new_set = set() for val in only_in_compare: relevant_val = tuple(val[cols.index(sub_col)] for sub_col in sub_cols) if relevant_val not in unique_vals_dict[sub_cols]: new_set.add(val) only_in_compare = new_set if only_in_compare: print(f"Only {name} buildings have {cols} in {sorted(only_in_compare)} \n") found_uniq_chars += 1 unique_vals_dict[cols] = only_in_compare if not found_uniq_chars: print(f"No {combi_size}-column unique chracteristics found.") def _reduce_logic(self, logic, parent=None): cache_key = str(logic) if parent is None else parent + "[" + str(logic) + "]" if cache_key in self._logic_cache: return self._logic_cache[cache_key] logic_array = np.ones((1, self.total_samples), dtype=bool) if parent not in [None, "and", "or", "not"]: raise ValueError(f"Logic can only inlcude and, or, not blocks. {parent} found in {logic}.") if isinstance(logic, str): para, opt = UpgradesAnalyzer._get_para_option(logic) logic_array = self.buildstock_df[para] == opt elif isinstance(logic, list): if len(logic) == 1: logic_array = self._reduce_logic(logic[0]).copy() elif parent in ["or"]: logic_array = reduce( lambda l1, l2: l1 | self._reduce_logic(l2), logic, np.zeros((1, self.total_samples), dtype=bool), ) else: logic_array = reduce( lambda l1, l2: l1 & self._reduce_logic(l2), logic, np.ones((1, self.total_samples), dtype=bool), ) elif isinstance(logic, dict): if len(logic) > 1: raise ValueError(f"Dicts cannot have more than one keys. {logic} has.") key = list(logic.keys())[0] logic_array = self._reduce_logic(logic[key], parent=key).copy() if parent == "not": return ~logic_array if not (isinstance(logic, str) or (isinstance(logic, list) and len(logic) == 1)): # Don't cache small logics - computing them again won't be too bad self._logic_cache[cache_key] = logic_array.copy() return logic_array def get_report(self, upgrade_num: Optional[int] = None) -> pd.DataFrame: """Analyses how many buildings various options in all the upgrades is going to apply to and returns a report in DataFrame format. Args: upgrade_num: Numeric index of upgrade (1-indexed). If None, all upgrades are assessed Returns: pd.DataFrame: The upgrade and options report. """ def _get_records(indx, upgrade): records = [] logger.info(f"Analyzing upgrade {indx + 1}") all_applied_bldgs = np.zeros((1, self.total_samples), dtype=bool) package_applied_bldgs = np.ones((1, self.total_samples), dtype=bool) if "package_apply_logic" in upgrade: pkg_flat_logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"]) package_applied_bldgs = self._reduce_logic(pkg_flat_logic, parent=None) for opt_index, option in enumerate(upgrade["options"]): applied_bldgs = np.ones((1, self.total_samples), dtype=bool) if "apply_logic" in option: flat_logic = UpgradesAnalyzer._normalize_lists(option["apply_logic"]) applied_bldgs &= self._reduce_logic(flat_logic, parent=None) else: applied_bldgs = np.ones((1, self.total_samples), dtype=bool) applied_bldgs &= package_applied_bldgs count = applied_bldgs.sum() all_applied_bldgs |= applied_bldgs record = { "upgrade": indx + 1, "upgrade_name": upgrade["upgrade_name"], "option_num": opt_index + 1, "option": option["option"], "applicable_to": count, "applicable_percent": self._to_pct(count), "applicable_buildings": set(self.buildstock_df.loc[applied_bldgs[0]].index), } records.append(record) count = all_applied_bldgs.sum() record = { "upgrade": indx + 1, "upgrade_name": upgrade["upgrade_name"], "option_num": -1, "option": "All", "applicable_to": count, "applicable_buildings": set(self.buildstock_df.loc[all_applied_bldgs[0]].index), "applicable_percent": self._to_pct(count), } records.append(record) return records cfg = self.get_cfg() self._logic_cache = {} if "upgrades" not in cfg: raise ValueError("The project yaml has no upgrades defined") max_upg = len(cfg["upgrades"]) + 1 if upgrade_num is not None: if upgrade_num <= 0 or upgrade_num > max_upg: raise ValueError(f"Invalid upgrade {upgrade_num}. Valid upgrade_num = {list(range(1, max_upg))}.") records = [] for indx, upgrade in enumerate(cfg["upgrades"]): if upgrade_num is None or upgrade_num == indx + 1: records += _get_records(indx, upgrade) else: continue report_df = pd.DataFrame.from_records(records) return report_df def get_upgraded_buildstock(self, upgrade_num): report_df = self.get_report(upgrade_num) upgrade_name = report_df["upgrade_name"].unique()[0] logger.info(f" * Upgraded buildstock for upgrade {upgrade_num} : {upgrade_name}") df = self.buildstock_df_original.copy() for idx, row in report_df.iterrows(): if row["option"] == "All": continue dimension, upgrade_option = row["option"].split("|") apply_logic = df["Building"].isin(row["applicable_buildings"]) # apply upgrade df[dimension] = np.where(apply_logic, upgrade_option, df[dimension]) # report cond = report_df["option"] == "All" n_total = len(self.buildstock_df_original) n_applied = report_df.loc[cond, "applicable_to"].iloc[0] n_applied_pct = report_df.loc[cond, "applicable_percent"].iloc[0] logger.info( f" Upgrade package has {len(report_df)-1} options and " f"was applied to {n_applied} / {n_total} dwelling units ( {n_applied_pct} % )" ) # QC n_diff = len(self.buildstock_df_original.compare(df)) - n_applied if n_diff > 0: raise ValueError( f"Relative to baseline buildstock, upgraded buildstock has {n_diff} more rows " "of difference than reported." ) elif n_diff < 0: logger.warning( f"Relative to baseline buildstock, upgraded buildstock has {-1*n_diff} fewer rows " "of difference than reported. This is okay, but indicates that some parameters are " "being upgraded to the same incumbent option (e.g., LEDs to LEDs). Check that this is intentional." ) else: logger.info("No cases of parameter upgraded with incumbent option detected.") return df @staticmethod def _normalize_lists(logic, parent=None): """Any list that is not in a or block is considered to be in an and block. This block will normalize this pattern by adding "and" wherever required. Args: logic (_type_): Logic structure (dict, list etc) parent (_type_, optional): The parent of the current logic block. If it is a list, and there is no parent, the list will be wrapped in a and block. Returns: _type_: _description_ """ if isinstance(logic, list): # If it is a single element list, just unwrap and return if len(logic) == 1: return UpgradesAnalyzer._normalize_lists(logic[0]) new_logic = [UpgradesAnalyzer._normalize_lists(el) for el in logic] return {"and": new_logic} if parent is None else new_logic elif isinstance(logic, dict): new_dict = {key: UpgradesAnalyzer._normalize_lists(value, parent=key) for key, value in logic.items()} return new_dict else: return logic def _get_options_application_count_report(self, logic_dict) -> Optional[pd.DataFrame]: """ For a given logic dictionary, this method will return a report df of options application. Example report below: Applied options Applied buildings Cumulative sub Cumulative all Number of options 4 1, 10, 13, 14 75 (0.1%) 75 (0.1%) 75 (0.1%) 4 1, 11, 13, 14 2279 (2.3%) 2354 (2.4%) 2354 (2.4%) 4 1, 12, 13, 14 309 (0.3%) 2663 (2.7%) 2663 (2.7%) 5 1, 2, 3, 13, 14 8 (0.0%) 8 (0.0%) 2671 (2.7%) 5 1, 2, 4, 13, 14 158 (0.2%) 166 (0.2%) 2829 (2.8%) 5 1, 2, 5, 13, 14 65 (0.1%) 231 (0.2%) 2894 (2.9%) 5 1, 6, 7, 13, 14 23 (0.0%) 254 (0.3%) 2917 (2.9%) 5 1, 6, 8, 13, 14 42 (0.0%) 296 (0.3%) 2959 (3.0%) """ n_options = len(logic_dict) if n_options < 2: return None logic_df = pd.DataFrame(logic_dict) nbldgs = len(logic_df) opts2count = logic_df.apply(lambda row: tuple(indx+1 for indx, val in enumerate(row) if val), axis=1).value_counts().to_dict() cum_count_all = 0 cum_count = defaultdict(int) application_report_rows = [] for applied_opts in sorted(opts2count.keys(), key=lambda x: (len(x), x)): num_opt = len(applied_opts) if num_opt == 0: continue n_applied_bldgs = opts2count[applied_opts] cum_count_all += n_applied_bldgs cum_count[num_opt] += n_applied_bldgs record = {"Number of options": num_opt, "Applied options": ", ".join([f"{logic_df.columns[opt - 1]}" for opt in applied_opts]), "Applied buildings": f"{n_applied_bldgs} ({self._to_pct(n_applied_bldgs, nbldgs)}%)", "Cumulative sub": f"{cum_count[num_opt]} ({self._to_pct(cum_count[num_opt], nbldgs)}%)", "Cumulative all": f"{cum_count_all} ({self._to_pct(cum_count_all, nbldgs)}%)" } application_report_rows.append(record) assert cum_count_all <= nbldgs, "Cumulative count of options applied is more than total number of buildings." if application_report_rows: application_report_df = pd.DataFrame(application_report_rows).set_index("Number of options") return application_report_df return None def _get_left_out_report_all(self, upgrade_num): cfg = self.get_cfg() report_str = "" upgrade = cfg["upgrades"][upgrade_num - 1] ugrade_name = upgrade.get("upgrade_name") header = f"Left Out Report for - Upgrade{upgrade_num}:'{ugrade_name}'" report_str += "-" * len(header) + "\n" report_str += header + "\n" report_str += "-" * len(header) + "\n" logic = {"or": []} for opt in upgrade["options"]: if "apply_logic" in opt: logic["or"].append(self._normalize_lists(opt["apply_logic"])) if "package_apply_logic" in upgrade: logic = {"and": [logic, upgrade["package_apply_logic"]]} logic = {"not": logic} # invert it logic = self.parser.normalize_logic(logic) logic_array, logic_str = self._get_logic_report(logic) footer_len = len(logic_str[-1]) report_str += "\n".join(logic_str) + "\n" report_str += "-" * footer_len + "\n" count = logic_array.sum() footer_str = f"Overall Not Applied to => {count} ({self._to_pct(count)}%)." report_str += footer_str + "\n" report_str += "-" * len(footer_str) + "\n" return logic_array, report_str def get_left_out_report(self, upgrade_num: int, option_num: Optional[int] = None) -> tuple[np.ndarray, str]: """Prints detailed report for a particular upgrade (and optionally, an option) Args: upgrade_num (int): The 1-indexed upgrade for which to print the report. option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which will print report for all options. normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False. Returns: (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str. """ cfg = self.get_cfg() if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1: raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.") if option_num is None: return self._get_left_out_report_all(upgrade_num) self._logic_cache = {} if upgrade_num == 0 or option_num == 0: raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}") report_str = "" try: upgrade = cfg["upgrades"][upgrade_num - 1] opt = upgrade["options"][option_num - 1] except (KeyError, IndexError, TypeError) as e: raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e ugrade_name = upgrade.get("upgrade_name") header = f"Left Out Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'" report_str += "-" * len(header) + "\n" report_str += header + "\n" report_str += "-" * len(header) + "\n" if "apply_logic" in opt and "package_apply_logic" in upgrade: logic = {"not": {"and": [opt["apply_logic"], upgrade["package_apply_logic"]]}} elif "apply_logic" in opt: logic = {"not": opt["apply_logic"]} else: logic = {"not": upgrade["package_apply_logic"]} logic = self.parser.normalize_logic(logic) logic_array, logic_str = self._get_logic_report(logic) footer_len = len(logic_str[-1]) report_str += "\n".join(logic_str) + "\n" report_str += "-" * footer_len + "\n" count = logic_array.sum() footer_str = f"Overall Not Applied to => {count} ({self._to_pct(count)}%)." report_str += footer_str + "\n" report_str += "-" * len(footer_str) + "\n" return logic_array, report_str def get_detailed_report(self, upgrade_num: int, option_num: Optional[int] = None, normalize_logic: bool = False) -> tuple[np.ndarray, str]: """Prints detailed report for a particular upgrade (and optionally, an option) Args: upgrade_num (int): The 1-indexed upgrade for which to print the report. option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which will print report for all options. normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False. Returns: (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str. """ cfg = self.get_cfg() if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1: raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.") if option_num is None: return self._get_detailed_report_all(upgrade_num, normalize_logic=normalize_logic) self._logic_cache = {} if upgrade_num == 0 or option_num == 0: raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}") report_str = "" try: upgrade = cfg["upgrades"][upgrade_num - 1] opt = upgrade["options"][option_num - 1] except (KeyError, IndexError, TypeError) as e: raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e ugrade_name = upgrade.get("upgrade_name") header = f"Option Apply Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'" report_str += "-" * len(header) + "\n" report_str += header + "\n" report_str += "-" * len(header) + "\n" if "apply_logic" in opt: logic = UpgradesAnalyzer._normalize_lists(opt["apply_logic"]) logic = self.parser.normalize_logic(logic) if normalize_logic else logic logic_array, logic_str = self._get_logic_report(logic) footer_len = len(logic_str[-1]) report_str += "\n".join(logic_str) + "\n" report_str += "-" * footer_len + "\n" else: logic_array = np.ones((1, self.total_samples), dtype=bool) if "package_apply_logic" in upgrade: logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"]) logic = self.parser.normalize_logic(logic) if normalize_logic else logic package_logic_array, logic_str = self._get_logic_report(logic) footer_len = len(logic_str[-1]) report_str += "Package Apply Logic Report" + "\n" report_str += "--------------------------" + "\n" report_str += "\n".join(logic_str) + "\n" report_str += "-" * footer_len + "\n" logic_array = logic_array & package_logic_array count = logic_array.sum() footer_str = f"Overall applied to => {count} ({self._to_pct(count)}%)." report_str += footer_str + "\n" report_str += "-" * len(footer_str) + "\n" return logic_array, report_str def _get_detailed_report_all(self, upgrade_num, normalize_logic: bool = False): conds_dict = {} grouped_conds_dict = {} cfg = self.get_cfg() report_str = "" n_options = len(cfg["upgrades"][upgrade_num - 1]["options"]) or_array = np.zeros((1, self.total_samples), dtype=bool) and_array = np.ones((1, self.total_samples), dtype=bool) for option_indx in range(n_options): logic_array, sub_report_str = self.get_detailed_report(upgrade_num, option_indx + 1, normalize_logic=normalize_logic) opt_name, _ = self._get_para_option(cfg["upgrades"][upgrade_num - 1]["options"][option_indx]["option"]) report_str += sub_report_str + "\n" conds_dict[option_indx + 1] = logic_array if opt_name not in grouped_conds_dict: grouped_conds_dict[opt_name] = logic_array else: grouped_conds_dict[opt_name] |= logic_array or_array |= logic_array and_array &= logic_array and_count = and_array.sum() or_count = or_array.sum() report_str += f"All of the options (and-ing) were applied to: {and_count} ({self._to_pct(and_count)}%)" + "\n" report_str += f"Any of the options (or-ing) were applied to: {or_count} ({self._to_pct(or_count)}%)" + "\n" option_app_report = self._get_options_application_count_report(grouped_conds_dict) if option_app_report is not None: report_str += "-" * 80 + "\n" report_str += f"Report of how the {len(grouped_conds_dict)} options were applied to the buildings." + "\n" report_str += tabulate(option_app_report, headers='keys', tablefmt='grid', maxcolwidths=50) + "\n" detailed_app_report_df = self._get_options_application_count_report(conds_dict) if detailed_app_report_df is not None: report_str += "-" * 80 + "\n" if len(detailed_app_report_df) > 100: report_str += "Detailed report is skipped because of too many rows. " + "\n" report_str += "Ask the developer if this is useful to see" + "\n" else: report_str += f"Detailed report of how the {n_options} options were applied to the buildings." + "\n" report_str += tabulate(option_app_report, headers='keys', tablefmt='grid', maxcolwidths=50) + "\n" return or_array, report_str def _to_pct(self, count, total=None): total = total or self.total_samples return round(100 * count / total, 1) def _get_logic_report(self, logic, parent=None): logic_array = np.ones((1, self.total_samples), dtype=bool) logic_str = [""] if parent not in [None, "and", "or", "not"]: raise ValueError(f"Logic can only include and, or, not blocks. {parent} found in {logic}.") if isinstance(logic, str): logic_condition = UpgradesAnalyzer._get_eq_str(logic) logic_array = self.buildstock_df.eval(logic_condition, engine="python") count = logic_array.sum() logic_str = [logic + " => " + f"{count} ({self._to_pct(count)}%)"] elif isinstance(logic, list): if len(logic) == 1: logic_array, logic_str = self._get_logic_report(logic[0]) elif parent in ["or"]: def reducer(l1, l2): ll2 = self._get_logic_report(l2) return l1[0] | ll2[0], l1[1] + ll2[1] logic_array, logic_str = reduce(reducer, logic, (np.zeros((1, self.total_samples), dtype=bool), [])) else: def reducer(l1, l2): ll2 = self._get_logic_report(l2) return l1[0] & ll2[0], l1[1] + ll2[1] logic_array, logic_str = reduce(reducer, logic, (np.ones((1, self.total_samples), dtype=bool), [])) elif isinstance(logic, dict): if len(logic) > 1: raise ValueError(f"Dicts cannot have more than one keys. {logic} has.") key = list(logic.keys())[0] sub_logic = self._get_logic_report(logic[key], parent=key) sub_logic_str = sub_logic[1] logic_array = sub_logic[0] if key == "not": logic_array = ~logic_array count = logic_array.sum() header_str = key + " => " + f"{count} ({self._to_pct(count)}%)" logic_str = [header_str] + [f" {ls}" for ls in sub_logic_str] count = logic_array.sum() if parent is None and isinstance(logic, list) and len(logic) > 1: logic_str[0] = logic_str[0] + " => " + f"{count} ({self._to_pct(count)}%)" return logic_array, logic_str def save_detailed_report_all(self, file_path: str, logic_transform=None): """Save detailed text based upgrade report. Args: file_path (str): Output file. """ cfg = self.get_cfg() all_report = "" for upgrade in range(1, len(cfg["upgrades"]) + 1): logger.info(f"Getting report for upgrade {upgrade}") _, report = self.get_detailed_report(upgrade, normalize_logic=logic_transform) all_report += report + "\n" with open(file_path, "w") as file: file.write(all_report)
Static methods
def get_mentioned_parameters(logic: Union[list, dict, str]) ‑> list
-
Returns the list of all parameters referenced in a logic block. Useful for debugging
Args
logic
:Union[list, dict, str]
- The apply logic
Raises
ValueError
- If the input logic is invalid
Returns
List
- The list of parameters
Expand source code
@staticmethod def get_mentioned_parameters(logic: Union[list, dict, str]) -> list: """ Returns the list of all parameters referenced in a logic block. Useful for debugging Args: logic ( Union[list, dict, str]): The apply logic Raises: ValueError: If the input logic is invalid Returns: List: The list of parameters """ if not logic: return [] if isinstance(logic, str): return [UpgradesAnalyzer._get_para_option(logic)[0]] elif isinstance(logic, list): all_params = [] for el in logic: all_params.extend(UpgradesAnalyzer.get_mentioned_parameters(el)) return list(dict.fromkeys(all_params)) # remove duplicates while maintainig order elif isinstance(logic, dict): return UpgradesAnalyzer.get_mentioned_parameters(list(logic.values())[0]) else: raise ValueError("Invalid logic type")
Methods
def get_cfg(self) ‑> dict
-
Get the buildstock configuration file as a dictionary object.
Returns
dict
- The buildstock configuration file.
Expand source code
def get_cfg(self) -> dict: """Get the buildstock configuration file as a dictionary object. Returns: dict: The buildstock configuration file. """ with open(self.yaml_file) as f: config = yaml.load(f, Loader=yaml.SafeLoader) return config
def get_detailed_report(self, upgrade_num: int, option_num: Optional[int] = None, normalize_logic: bool = False) ‑> tuple[numpy.ndarray, str]
-
Prints detailed report for a particular upgrade (and optionally, an option)
Args
upgrade_num
:int
- The 1-indexed upgrade for which to print the report.
option_num
:int
, optional- The 1-indexed option number for which to print report. Defaults to None, which will print report for all options.
normalize_logic
:bool
, optional- Whether to normalize the logic structure. Defaults to False.
Returns
(np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.
Expand source code
def get_detailed_report(self, upgrade_num: int, option_num: Optional[int] = None, normalize_logic: bool = False) -> tuple[np.ndarray, str]: """Prints detailed report for a particular upgrade (and optionally, an option) Args: upgrade_num (int): The 1-indexed upgrade for which to print the report. option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which will print report for all options. normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False. Returns: (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str. """ cfg = self.get_cfg() if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1: raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.") if option_num is None: return self._get_detailed_report_all(upgrade_num, normalize_logic=normalize_logic) self._logic_cache = {} if upgrade_num == 0 or option_num == 0: raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}") report_str = "" try: upgrade = cfg["upgrades"][upgrade_num - 1] opt = upgrade["options"][option_num - 1] except (KeyError, IndexError, TypeError) as e: raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e ugrade_name = upgrade.get("upgrade_name") header = f"Option Apply Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'" report_str += "-" * len(header) + "\n" report_str += header + "\n" report_str += "-" * len(header) + "\n" if "apply_logic" in opt: logic = UpgradesAnalyzer._normalize_lists(opt["apply_logic"]) logic = self.parser.normalize_logic(logic) if normalize_logic else logic logic_array, logic_str = self._get_logic_report(logic) footer_len = len(logic_str[-1]) report_str += "\n".join(logic_str) + "\n" report_str += "-" * footer_len + "\n" else: logic_array = np.ones((1, self.total_samples), dtype=bool) if "package_apply_logic" in upgrade: logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"]) logic = self.parser.normalize_logic(logic) if normalize_logic else logic package_logic_array, logic_str = self._get_logic_report(logic) footer_len = len(logic_str[-1]) report_str += "Package Apply Logic Report" + "\n" report_str += "--------------------------" + "\n" report_str += "\n".join(logic_str) + "\n" report_str += "-" * footer_len + "\n" logic_array = logic_array & package_logic_array count = logic_array.sum() footer_str = f"Overall applied to => {count} ({self._to_pct(count)}%)." report_str += footer_str + "\n" report_str += "-" * len(footer_str) + "\n" return logic_array, report_str
def get_left_out_report(self, upgrade_num: int, option_num: Optional[int] = None) ‑> tuple[numpy.ndarray, str]
-
Prints detailed report for a particular upgrade (and optionally, an option)
Args
upgrade_num
:int
- The 1-indexed upgrade for which to print the report.
option_num
:int
, optional- The 1-indexed option number for which to print report. Defaults to None, which will print report for all options.
normalize_logic
:bool
, optional- Whether to normalize the logic structure. Defaults to False.
Returns
(np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.
Expand source code
def get_left_out_report(self, upgrade_num: int, option_num: Optional[int] = None) -> tuple[np.ndarray, str]: """Prints detailed report for a particular upgrade (and optionally, an option) Args: upgrade_num (int): The 1-indexed upgrade for which to print the report. option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which will print report for all options. normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False. Returns: (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str. """ cfg = self.get_cfg() if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1: raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.") if option_num is None: return self._get_left_out_report_all(upgrade_num) self._logic_cache = {} if upgrade_num == 0 or option_num == 0: raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}") report_str = "" try: upgrade = cfg["upgrades"][upgrade_num - 1] opt = upgrade["options"][option_num - 1] except (KeyError, IndexError, TypeError) as e: raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e ugrade_name = upgrade.get("upgrade_name") header = f"Left Out Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'" report_str += "-" * len(header) + "\n" report_str += header + "\n" report_str += "-" * len(header) + "\n" if "apply_logic" in opt and "package_apply_logic" in upgrade: logic = {"not": {"and": [opt["apply_logic"], upgrade["package_apply_logic"]]}} elif "apply_logic" in opt: logic = {"not": opt["apply_logic"]} else: logic = {"not": upgrade["package_apply_logic"]} logic = self.parser.normalize_logic(logic) logic_array, logic_str = self._get_logic_report(logic) footer_len = len(logic_str[-1]) report_str += "\n".join(logic_str) + "\n" report_str += "-" * footer_len + "\n" count = logic_array.sum() footer_str = f"Overall Not Applied to => {count} ({self._to_pct(count)}%)." report_str += footer_str + "\n" report_str += "-" * len(footer_str) + "\n" return logic_array, report_str
def get_report(self, upgrade_num: Optional[int] = None) ‑> pandas.core.frame.DataFrame
-
Analyses how many buildings various options in all the upgrades is going to apply to and returns a report in DataFrame format.
Args
upgrade_num
- Numeric index of upgrade (1-indexed). If None, all upgrades are assessed
Returns
pd.DataFrame
- The upgrade and options report.
Expand source code
def get_report(self, upgrade_num: Optional[int] = None) -> pd.DataFrame: """Analyses how many buildings various options in all the upgrades is going to apply to and returns a report in DataFrame format. Args: upgrade_num: Numeric index of upgrade (1-indexed). If None, all upgrades are assessed Returns: pd.DataFrame: The upgrade and options report. """ def _get_records(indx, upgrade): records = [] logger.info(f"Analyzing upgrade {indx + 1}") all_applied_bldgs = np.zeros((1, self.total_samples), dtype=bool) package_applied_bldgs = np.ones((1, self.total_samples), dtype=bool) if "package_apply_logic" in upgrade: pkg_flat_logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"]) package_applied_bldgs = self._reduce_logic(pkg_flat_logic, parent=None) for opt_index, option in enumerate(upgrade["options"]): applied_bldgs = np.ones((1, self.total_samples), dtype=bool) if "apply_logic" in option: flat_logic = UpgradesAnalyzer._normalize_lists(option["apply_logic"]) applied_bldgs &= self._reduce_logic(flat_logic, parent=None) else: applied_bldgs = np.ones((1, self.total_samples), dtype=bool) applied_bldgs &= package_applied_bldgs count = applied_bldgs.sum() all_applied_bldgs |= applied_bldgs record = { "upgrade": indx + 1, "upgrade_name": upgrade["upgrade_name"], "option_num": opt_index + 1, "option": option["option"], "applicable_to": count, "applicable_percent": self._to_pct(count), "applicable_buildings": set(self.buildstock_df.loc[applied_bldgs[0]].index), } records.append(record) count = all_applied_bldgs.sum() record = { "upgrade": indx + 1, "upgrade_name": upgrade["upgrade_name"], "option_num": -1, "option": "All", "applicable_to": count, "applicable_buildings": set(self.buildstock_df.loc[all_applied_bldgs[0]].index), "applicable_percent": self._to_pct(count), } records.append(record) return records cfg = self.get_cfg() self._logic_cache = {} if "upgrades" not in cfg: raise ValueError("The project yaml has no upgrades defined") max_upg = len(cfg["upgrades"]) + 1 if upgrade_num is not None: if upgrade_num <= 0 or upgrade_num > max_upg: raise ValueError(f"Invalid upgrade {upgrade_num}. Valid upgrade_num = {list(range(1, max_upg))}.") records = [] for indx, upgrade in enumerate(cfg["upgrades"]): if upgrade_num is None or upgrade_num == indx + 1: records += _get_records(indx, upgrade) else: continue report_df = pd.DataFrame.from_records(records) return report_df
def get_upgraded_buildstock(self, upgrade_num)
-
Expand source code
def get_upgraded_buildstock(self, upgrade_num): report_df = self.get_report(upgrade_num) upgrade_name = report_df["upgrade_name"].unique()[0] logger.info(f" * Upgraded buildstock for upgrade {upgrade_num} : {upgrade_name}") df = self.buildstock_df_original.copy() for idx, row in report_df.iterrows(): if row["option"] == "All": continue dimension, upgrade_option = row["option"].split("|") apply_logic = df["Building"].isin(row["applicable_buildings"]) # apply upgrade df[dimension] = np.where(apply_logic, upgrade_option, df[dimension]) # report cond = report_df["option"] == "All" n_total = len(self.buildstock_df_original) n_applied = report_df.loc[cond, "applicable_to"].iloc[0] n_applied_pct = report_df.loc[cond, "applicable_percent"].iloc[0] logger.info( f" Upgrade package has {len(report_df)-1} options and " f"was applied to {n_applied} / {n_total} dwelling units ( {n_applied_pct} % )" ) # QC n_diff = len(self.buildstock_df_original.compare(df)) - n_applied if n_diff > 0: raise ValueError( f"Relative to baseline buildstock, upgraded buildstock has {n_diff} more rows " "of difference than reported." ) elif n_diff < 0: logger.warning( f"Relative to baseline buildstock, upgraded buildstock has {-1*n_diff} fewer rows " "of difference than reported. This is okay, but indicates that some parameters are " "being upgraded to the same incumbent option (e.g., LEDs to LEDs). Check that this is intentional." ) else: logger.info("No cases of parameter upgraded with incumbent option detected.") return df
def print_unique_characteristic(self, upgrade_num: int, name: str, base_bldg_list: list, compare_bldg_list: list)
-
Finds and prints what's unique among a list of buildings compared to baseline buildings. Useful for debugging why a certain set of buildings' energy consumption went up for an upgrade, for example.
Args
upgrade_num
:int
- The upgrade for which the analysis is being done.
name
:str
- Some name to identify the building set (only used for printing)
base_bldg_list
:list
- The set of 'normal' buildings id to compare against.
compare_bldg_list
:list
- The set of buildings whose unique characteristics is to be printed.
Expand source code
def print_unique_characteristic(self, upgrade_num: int, name: str, base_bldg_list: list, compare_bldg_list: list): """Finds and prints what's unique among a list of buildings compared to baseline buildings. Useful for debugging why a certain set of buildings' energy consumption went up for an upgrade, for example. Args: upgrade_num (int): The upgrade for which the analysis is being done. name (str): Some name to identify the building set (only used for printing) base_bldg_list (list): The set of 'normal' buildings id to compare against. compare_bldg_list (list): The set of buildings whose unique characteristics is to be printed. """ cfg = self.get_cfg() if upgrade_num == 0: raise ValueError(f"Upgrades are 1-indexed. Got {upgrade_num}") try: upgrade_cfg = cfg["upgrades"][upgrade_num - 1] except KeyError as e: raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrades are 1-indexed, FYI.") from e parameter_list = [] for option_cfg in upgrade_cfg["options"]: parameter_list.append(UpgradesAnalyzer._get_para_option(option_cfg["option"])[0]) parameter_list.extend(UpgradesAnalyzer.get_mentioned_parameters(option_cfg.get("apply_logic"))) res_df = self.buildstock_df # remove duplicates (dict.fromkeys) and remove parameters not existing in buildstock_df parameter_list = [param for param in dict.fromkeys(parameter_list) if param in res_df.columns] compare_df = res_df.loc[compare_bldg_list] base_df = res_df.loc[base_bldg_list] print(f"Comparing {len(compare_df)} buildings with {len(base_df)} other buildings.") unique_vals_dict: dict[tuple[str, ...], set[tuple[str, ...]]] = {} for col in res_df.columns: no_change_set = set(compare_df[col].fillna("").unique()) other_set = set(base_df[col].fillna("").unique()) if only_in_no_change := no_change_set - other_set: print(f"Only {name} buildings have {col} in {sorted(only_in_no_change)}") unique_vals_dict[(col,)] = {(entry,) for entry in only_in_no_change} if not unique_vals_dict: print("No 1-column unique chracteristics found.") for combi_size in range(2, min(len(parameter_list) + 1, 5)): print(f"Checking {combi_size} column combinations out of {parameter_list}") found_uniq_chars = 0 for cols in combinations(parameter_list, combi_size): compare_tups = compare_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None) other_tups = base_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None) only_in_compare = set(compare_tups) - set(other_tups) # remove cases arisen out of uniqueness found earlier with smaller susbset of cols for sub_combi_size in range(1, len(cols)): for sub_cols in combinations(cols, sub_combi_size): if sub_cols in unique_vals_dict: new_set = set() for val in only_in_compare: relevant_val = tuple(val[cols.index(sub_col)] for sub_col in sub_cols) if relevant_val not in unique_vals_dict[sub_cols]: new_set.add(val) only_in_compare = new_set if only_in_compare: print(f"Only {name} buildings have {cols} in {sorted(only_in_compare)} \n") found_uniq_chars += 1 unique_vals_dict[cols] = only_in_compare if not found_uniq_chars: print(f"No {combi_size}-column unique chracteristics found.")
def save_detailed_report_all(self, file_path: str, logic_transform=None)
-
Save detailed text based upgrade report.
Args
file_path
:str
- Output file.
Expand source code
def save_detailed_report_all(self, file_path: str, logic_transform=None): """Save detailed text based upgrade report. Args: file_path (str): Output file. """ cfg = self.get_cfg() all_report = "" for upgrade in range(1, len(cfg["upgrades"]) + 1): logger.info(f"Getting report for upgrade {upgrade}") _, report = self.get_detailed_report(upgrade, normalize_logic=logic_transform) all_report += report + "\n" with open(file_path, "w") as file: file.write(all_report)