Module buildstock_query.report_query

Expand source code
from pandas import DataFrame
from collections import Counter, defaultdict
import sqlalchemy as sa
from sqlalchemy.sql import func as safunc
import logging
import pandas as pd
from buildstock_query.helpers import print_r, print_g
from ast import literal_eval
from functools import reduce
import buildstock_query.main as main
import typing
from typing import Optional, Union, Literal, Hashable, Sequence
from buildstock_query.schema.utilities import AnyColType
from pydantic import validate_arguments, Field
from typing_extensions import assert_never

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
FUELS = ['electricity', 'natural_gas', 'propane', 'fuel_oil', 'coal', 'wood_cord', 'wood_pellets']


class BuildStockReport:
    """Class with a collection of functions for reporting and integrity check queries.
    """

    def __init__(self, bsq: 'main.BuildStockQuery') -> None:
        self._bsq = bsq

    def _rename_completed_status_column(self, df: DataFrame) -> DataFrame:
        df = df.rename(columns={self._bsq.db_schema.column_names.completed_status: "completed_status"})
        rev_value_map = {db_val: normal_val for normal_val, db_val in self._bsq.db_schema.completion_values}
        df["completed_status"] = df["completed_status"].map(rev_value_map)
        return df

    @typing.overload
    def _get_bs_success_report(self, get_query_only: Literal[False] = False) -> DataFrame:
        ...

    @typing.overload
    def _get_bs_success_report(self, get_query_only: Literal[True]) -> str:
        ...

    @typing.overload
    def _get_bs_success_report(self, get_query_only: bool) -> Union[DataFrame, str]:
        ...

    def _get_bs_success_report(self, get_query_only: bool = False):
        bs_query = sa.select([self._bsq._bs_completed_status_col, safunc.count().label("count")])
        bs_query = bs_query.group_by(sa.text('1'))
        if get_query_only:
            return self._bsq._compile(bs_query)
        df = self._bsq.execute(bs_query)
        df = self._rename_completed_status_column(df)
        df.insert(0, 'upgrade', 0)
        return self._process_report(df)

    @typing.overload
    def _get_change_report(self, get_query_only: Literal[False] = False) -> DataFrame:
        ...

    @typing.overload
    def _get_change_report(self, get_query_only: Literal[True]) -> list[str]:
        ...

    @typing.overload
    def _get_change_report(self, get_query_only: bool) -> Union[DataFrame, list[str]]:
        ...

    def _get_change_report(self, get_query_only: bool = False):
        """Returns counts of buildings to which upgrade didn't do any changes on energy consumption

        Args:
            get_query_only (bool, optional): _description_. Defaults to False.
        """
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")

        queries: list[str] = []
        chng_types = ["no-chng", "bad-chng", "ok-chng", "true-bad-chng", "true-ok-chng", "null", "any"]
        for ch_type in chng_types:
            up_query = sa.select([self._bsq.up_table.c['upgrade'], safunc.count().label("change")])
            up_query = up_query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            conditions = self._get_change_conditions(change_type=ch_type)
            up_query = up_query.where(sa.and_(self._bsq._bs_successful_condition,
                                              self._bsq._up_successful_condition,
                                              conditions))  # type: ignore
            up_query = up_query.group_by(sa.text('1'))
            up_query = up_query.order_by(sa.text('1'))
            queries.append(self._bsq._compile(up_query))
        if get_query_only:
            return queries
        change_df: DataFrame = pd.DataFrame()
        for chng_type, query in zip(chng_types, queries):
            df = self._bsq.execute(query)
            df.rename(columns={"change": chng_type}, inplace=True)
            df['upgrade'] = df['upgrade'].map(int)
            df = df.set_index('upgrade').sort_index()
            change_df = change_df.join(df, how='outer') if len(change_df) > 0 else df
        change_df = change_df.fillna(0)
        for chng_type in chng_types:
            if chng_type not in change_df.columns:
                change_df[chng_type] = 0
        return change_df

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def print_change_details(self, upgrade_id: int, yml_file: str, opt_sat_path: str,
                             change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                  "true-ok-chng", "null", "any"] = 'no-chng'):
        ua = self._bsq.get_upgrades_analyzer(yml_file, opt_sat_path)
        bad_bids = self.get_buildings_by_change(upgrade_id=upgrade_id, change_type=change_type)
        good_bids = self.get_buildings_by_change(upgrade_id=upgrade_id, change_type='ok-chng')
        ua.print_unique_characteristic(upgrade_id, change_type, good_bids, bad_bids)

    @typing.overload
    def _get_upgrade_buildings(self, *, upgrade_id: int, trim_missing_bs: bool = True,
                               get_query_only: Literal[False] = False) -> list[int]:
        ...

    @typing.overload
    def _get_upgrade_buildings(self, *, upgrade_id: int, get_query_only: Literal[True],
                               trim_missing_bs: bool = True) -> str:
        ...

    @typing.overload
    def _get_upgrade_buildings(self, *, upgrade_id: int, get_query_only: bool,
                               trim_missing_bs: bool = True) -> Union[list[int], str]:
        ...

    def _get_upgrade_buildings(self, *, upgrade_id: int, trim_missing_bs: bool = True, get_query_only: bool = False):
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        up_query = sa.select([self._bsq.up_bldgid_column])
        if trim_missing_bs:
            up_query = up_query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            up_query = up_query.where(sa.and_(self._bsq._bs_successful_condition,
                                              self._bsq._up_successful_condition,
                                              self._bsq.up_table.c['upgrade'] == str(upgrade_id),
                                              ))
        else:
            up_query = up_query.where(sa.and_(self._bsq.up_table.c['upgrade'] == str(upgrade_id),
                                              self._bsq._up_successful_condition))
        if get_query_only:
            return self._bsq._compile(up_query)
        df = self._bsq.execute(up_query)
        return df[self._bsq.bs_bldgid_column.name].to_numpy(dtype='int32').tolist()

    def _get_change_conditions(self, change_type: str):
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")

        threshold = 1e-3
        fuel_cols = list(c for c in self._bsq.db_schema.column_names.fuel_totals
                         if c in self._bsq.up_table.columns)  # Look at all fuel type totals
        all_cols = list(fuel_cols)
        if self._bsq.db_schema.column_names.unmet_hours_cooling_hr:
            all_cols += [self._bsq.db_schema.column_names.unmet_hours_cooling_hr]
        if self._bsq.db_schema.column_names.unmet_hours_heating_hr:
            all_cols += [self._bsq.db_schema.column_names.unmet_hours_heating_hr]
        null_chng_conditions = sa.and_(*[sa.or_(self._bsq.up_table.c[col] == sa.null(),
                                                self._bsq.bs_table.c[col] == sa.null()
                                                ) for col in fuel_cols])

        no_chng_conditions = sa.and_(*[safunc.coalesce(safunc.abs(self._bsq.up_table.c[col] -
                                                                  self._bsq.bs_table.c[col]), 0) < threshold
                                       for col in fuel_cols])
        good_chng_conditions = sa.or_(
            *[self._bsq.bs_table.c[col] - self._bsq.up_table.c[col] >= threshold for col in fuel_cols])
        opp_chng_conditions = sa.and_(*[safunc.coalesce(self._bsq.bs_table.c[col] - self._bsq.up_table.c[col], -1) <
                                        threshold for col in fuel_cols], sa.not_(no_chng_conditions))
        true_good_chng_conditions = sa.or_(*[self._bsq.bs_table.c[col] - self._bsq.up_table.c[col] >= threshold
                                             for col in all_cols])
        true_opp_chng_conditions = sa.and_(*[safunc.coalesce(self._bsq.bs_table.c[col] - self._bsq.up_table.c[col], -1)
                                             < threshold for col in all_cols], sa.not_(no_chng_conditions))
        if change_type == 'no-chng':
            conditions = no_chng_conditions
        elif change_type == 'bad-chng':
            conditions = opp_chng_conditions
        elif change_type == 'true-bad-chng':
            conditions = true_opp_chng_conditions
        elif change_type == 'ok-chng':
            conditions = good_chng_conditions
        elif change_type == 'true-ok-chng':
            conditions = true_good_chng_conditions
        elif change_type == 'null':
            conditions = null_chng_conditions
        elif change_type == 'any':
            conditions = sa.true
        else:
            raise ValueError(f"Invalid {change_type=}")
        return conditions

    @typing.overload
    def get_buildings_by_change(self, *, upgrade_id: int, get_query_only: Literal[True],
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng'
                                ) -> str:
        ...

    @typing.overload
    def get_buildings_by_change(self, *,  upgrade_id: int, get_query_only: Literal[False] = False,
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng'
                                ) -> list[int]:
        ...

    @typing.overload
    def get_buildings_by_change(self, *, upgrade_id: int, get_query_only: bool,
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng'
                                ) -> Union[list[int], str]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_buildings_by_change(self, *, upgrade_id: int,
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng',
                                get_query_only: bool = False):

        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        up_query = sa.select([self._bsq.bs_bldgid_column, self._bsq._bs_completed_status_col,
                              self._bsq._up_completed_status_col])
        up_query = up_query.join(self._bsq.up_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)

        conditions = self._get_change_conditions(change_type)
        up_query = up_query.where(sa.and_(self._bsq._bs_successful_condition,
                                          self._bsq._up_successful_condition,
                                          self._bsq.up_table.c['upgrade'] == str(upgrade_id),
                                          conditions))  # type: ignore
        if get_query_only:
            return self._bsq._compile(up_query)
        df = self._bsq.execute(up_query)
        return df[self._bsq.bs_bldgid_column.name].to_numpy(dtype='int32').tolist()

    @typing.overload
    def _get_up_success_report(self, *, get_query_only: Literal[True],
                               trim_missing_bs: bool = True) -> str:
        ...

    @typing.overload
    def _get_up_success_report(self, *, get_query_only: Literal[False] = False,
                               trim_missing_bs: bool = True) -> pd.DataFrame:
        ...

    @typing.overload
    def _get_up_success_report(self, *, get_query_only: bool,
                               trim_missing_bs: bool = True) -> Union[pd.DataFrame, str]:
        ...

    def _get_up_success_report(self, *, trim_missing_bs: bool = True, get_query_only: bool = False):
        """Get success report for upgrades

        Args:
            trim_missing_bs (bool, optional): Ignore buildings that have no successful runs in the baseline.
                Defaults to True.
            get_query_only (bool, optional): Returns query only without the result. Defaults to False.

        Returns:
            Union[str, pd.DataFrame]: If get_query_only then returns the query string. Otherwise returns the dataframe.
        """
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        up_query = sa.select([self._bsq.up_table.c['upgrade'], self._bsq._up_completed_status_col,
                              safunc.count().label("count")])
        if trim_missing_bs:
            up_query = up_query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            up_query = up_query.where(self._bsq._bs_successful_condition)

        up_query = up_query.group_by(sa.text('1'), sa.text('2'))
        up_query = up_query.order_by(sa.text('1'), sa.text('2'))
        if get_query_only:
            return self._bsq._compile(up_query)
        df = self._bsq.execute(up_query)
        df = self._rename_completed_status_column(df)
        return self._process_report(df)

    def _process_report(self, df: DataFrame):
        df['upgrade'] = df['upgrade'].map(int)
        pf = df.pivot(index=['upgrade'], columns=['completed_status'],
                      values=['count'])
        pf.columns = [c[1] for c in pf.columns]
        pf['Sum'] = pf.sum(axis=1)
        for col in ['fail', 'unapplicable']:
            if col not in pf.columns:
                pf.insert(1, col, 0)
        return pf

    @typing.overload
    def _get_full_options_report(self, *, trim_missing_bs: bool, get_query_only: Literal[True]) -> str:
        ...

    @typing.overload
    def _get_full_options_report(self, *, trim_missing_bs: bool,
                                 get_query_only: Literal[False] = False) -> pd.DataFrame:
        ...

    @typing.overload
    def _get_full_options_report(self, *, trim_missing_bs: bool, get_query_only: bool) -> Union[pd.DataFrame, str]:
        ...

    def _get_full_options_report(self, trim_missing_bs: bool = True, get_query_only: bool = False):
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        opt_name_cols = [c for c in self._bsq.up_table.columns if c.name.startswith("upgrade_costs.option_")
                         and c.name.endswith("name")]
        query = sa.select([self._bsq.up_table.c['upgrade']] + opt_name_cols + [safunc.count().label('success')]
                          + [safunc.array_agg(self._bsq.up_bldgid_column)])
        if trim_missing_bs:
            query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            query = query.where(self._bsq._bs_successful_condition)
        grouping_texts = [sa.text(str(i+1)) for i in range(1+len(opt_name_cols))]
        query = query.group_by(*grouping_texts)
        query = query.order_by(*grouping_texts)
        if get_query_only:
            return self._bsq._compile(query)
        df = self._bsq.execute(query)
        simple_names = [f"option{i+1}" for i in range(len(opt_name_cols))]
        df.columns = ['upgrade'] + simple_names + ['success', "applied_buildings"]
        df['upgrade'] = df['upgrade'].map(int)
        df['applied_buildings'] = df['applied_buildings'].map(lambda x: literal_eval(x))
        applied_rows = df[simple_names].any(axis=1)  # select only rows with at least one option applied
        return df[applied_rows]

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_options_report(self, trim_missing_bs: bool = True) -> pd.DataFrame:
        """Finds out the number and list of buildings each of the options applied to.

        Args:
            trim_missing_bs (bool, optional): Whether the buildings that are not available in basline should be dropped.
                                              Defaults to True.
        Returns:
            pd.DataFrame: The list of options the corresponding set of building ids the option applied to.
        """
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")

        full_report = self._get_full_options_report(trim_missing_bs=trim_missing_bs)
        option_cols = [c for c in full_report.columns if c.startswith("option")]
        total_counts: Counter = Counter()
        bldg_array: dict = defaultdict(list)
        for option in option_cols:
            grouped_dict = full_report.groupby(['upgrade', option]).aggregate({'success': 'sum',
                                                                               'applied_buildings': 'sum'}).to_dict()
            total_counts += Counter(grouped_dict['success'])
            for key, val in grouped_dict['applied_buildings'].items():
                bldg_array[key] += val

        option_df = pd.DataFrame.from_dict({'success': total_counts,
                                            'applied_buildings': bldg_array,
                                            }, orient='columns')
        option_df['applied_buildings'] = option_df['applied_buildings'].map(lambda x: set(x))
        option_df = option_df.reset_index()
        option_df.columns = ['upgrade', 'option', 'success', 'applied_buildings']
        # Aggregate for upgrade
        agg = option_df.groupby('upgrade').aggregate({'applied_buildings': lambda x: reduce(set.union, x)})
        agg = agg.reset_index()
        agg.insert(1, 'success', agg['applied_buildings'].map(lambda x: len(x)))
        agg.insert(0, 'option', 'All')
        full_df = pd.concat([option_df, agg])
        full_df = full_df.sort_values(['upgrade', 'option'])
        return full_df

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_option_integrity_report(self, yaml_file: str, opt_sat_path: str) -> pd.DataFrame:
        """Checks the upgrade/option spec in the buildstock configuration file against what is actually in the
        simulation result and tabulates the discrepancy.
        Args:
            yaml_file (str): The path to buildstock configuration file used to run the simulation

        Returns:
            pd.DataFrame: The report dataframe.
        """
        ua_df = self._bsq.get_upgrades_analyzer(yaml_file, opt_sat_path).get_report()
        ua_df = ua_df.groupby(['upgrade', 'option']).aggregate({'applicable_to': 'sum',
                                                                'applicable_buildings': lambda x: reduce(set.union, x)})
        assert (ua_df['applicable_to'] == ua_df['applicable_buildings'].map(lambda x: len(x))).all()
        opt_report_df = self.get_options_report().fillna(0)
        opt_report_df = opt_report_df.set_index(['upgrade', 'option'])
        diff_df = pd.DataFrame(index=ua_df.index)
        diff_df['applicable_buildings'] = ua_df['applicable_buildings']
        diff_df['applied_buildings'] = opt_report_df['applied_buildings']
        diff_df['overapplied_bldgs'] = opt_report_df['applied_buildings'] - ua_df['applicable_buildings']
        diff_df['unapplied_bldgs'] = ua_df['applicable_buildings'] - opt_report_df['applied_buildings']
        for col in diff_df.columns:
            diff_df[f"{col}_count"] = diff_df[col].map(lambda x: len(x) if isinstance(x, set) else 0)
        success_report_df = self.get_success_report()
        fail_report = success_report_df[['fail', 'success']].rename(columns={'fail': "Upgrade Failures",
                                                                             'success': "Upgrade Success"})
        diff_df = diff_df.join(fail_report)
        return diff_df

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def check_options_integrity(self, yaml_file: str, opt_sat_path: str) -> bool:
        """ Checks the upgrade/option spec in the buildstock configuration file against what is actually in the
        simulation result and flags any discrepancy. The verificationa allows for some mismatch since some simulations
        could have failed. Unless there is a bug somewhere in buildstock workflow, integrity check should pass
        regardless of number of failures.

        Args:
            yaml_file (str): The path to buildstock configuration file used to run the simulation

        Returns:
            bool: Whether or not the integrity check passed.
        """
        intg_df = self.get_option_integrity_report(yaml_file, opt_sat_path).reset_index()
        all_intg_df = intg_df[intg_df['option'] == 'All']
        blank_opt_upgrades = all_intg_df[all_intg_df['applied_buildings_count'] < all_intg_df['Upgrade Success']]
        assert (all_intg_df['applied_buildings_count'] >= all_intg_df['Upgrade Success']).all()
        if len(blank_opt_upgrades) > 0:
            print_r("BLANK OPTIONS: The following upgrades have fewere 'applied_buildings_count' than 'Upgrade Success'"
                    "This indicates that some buildings in these upgrades didn't have any option applied")
        serious = False
        for indx, row in intg_df.iterrows():
            upgrade_failures = row['Upgrade Failures']
            applicable_count = row.applicable_buildings_count
            applied_count = row.applied_buildings_count
            unapplied_count = row.unapplied_bldgs_count
            overapplied_count = row.overapplied_bldgs_count
            if row.unapplied_bldgs_count > 0:
                if row.option == 'All' and row.unapplied_bldgs_count == upgrade_failures:
                    print_r(
                        f"OPTION UNDERAPPLICATION: Upgrade {row.upgrade} was was supposed to be applied to "
                        f"{applicable_count} samples but applied to {applied_count} samples")
                    print_g(f"This difference of {unapplied_count} exactly matches with {upgrade_failures}"
                            f" failures in Upgrade {row.upgrade}. It's all good.")
                else:
                    print_r(f"OPTION UNDERAPPLICATION: Upgrade {row.upgrade}, {row.option} didn't apply to "
                            f"{unapplied_count} samples that it was supposed to apply to.")
                    if upgrade_failures > 0:
                        if unapplied_count > upgrade_failures:
                            print_r(f"{upgrade_failures} failures in Upgrade {row.upgrade} can't account for this.")
                            serious = True
                        else:
                            print_g(f"{upgrade_failures} failures in Upgrade {row.upgrade} may account for this.")
                    else:
                        serious = True

            if overapplied_count > 0:
                print_r(f"OPTION OVERAPPLICATION: Upgrade {row.upgrade}, {row.option} applied to"
                        f" {unapplied_count} samples that it was supposed to NOT apply to.")
                serious = True
        if not serious:
            print_g("Integrity check passed.")
            return True
        else:
            print_r("Integrity check failed. Please check the serious issues above.")
            return False

    @typing.overload
    def get_success_report(self, *, get_query_only: Literal[True],
                           trim_missing_bs: Union[Literal['auto'], bool] = 'auto') -> tuple[str, str, list[str]]:
        ...

    @typing.overload
    def get_success_report(self, *, get_query_only: Literal[False] = False,
                           trim_missing_bs: Union[Literal['auto'], bool] = 'auto') -> pd.DataFrame:
        ...

    @typing.overload
    def get_success_report(self, *, get_query_only: bool,
                           trim_missing_bs: Union[Literal['auto'], bool] = 'auto'
                           ) -> Union[pd.DataFrame, tuple[str, str, list[str]]]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_success_report(self, trim_missing_bs: Union[Literal['auto'], bool] = 'auto',
                           get_query_only: bool = False):
        """Returns a basic report showing number of success and failures for each upgrade along with percentage.
        Additional information regarding number of buildings to which the upgrade applied and whether the enduses
        changed is also returned.

        Args:
            trim_missing_bs (str | bool, optional): Whether the buildings that failed in baseline should be
                                                    dropped from the upgrades. If true, all metrics is calculated after
                                                    those buildings are dropped from the upgrades. Defaults to 'auto'.
            get_query_only (bool, optional): If true, returns SQL query instead of the report. Defaults to False.

        Raises:
            ValueError: If something went wrong.

        Returns:
            pd.DateFrame: The report dataframe. The meaning of the various columns are as follows:
                **Fail**: Number of simulation that failed.  
                **unapplicable**: Number of buildings to which the upgrade didn't apply (because of apply logic)  
                **Success**: The number of buildings which completed simulation successfully. No simulation is run for
                    unapplicable buildings.  
                **Sum**: Sum of the first three columns.  
                **Applied %**: Success / Sum * 100 %.  
                **no-chng** : Number of successful simulation that didn't have any change in values for any enduses.  
                **bad-chng:** Number of successful simulation that had bad changes. It's considered a bad change if
                   none of the fuel has any reduction in energy consumption, and at least one fuel has an
                   increase in energy consumption.  
                **ok-chng:** |set(success) - set(no-chng) - set(bad-chng)| i.e. count of successful simulation that are
                         neither no-chng nor bad-chng.  
                **true-bad-chng:** Count of only those bad changes in which neither of the umnet cooling/heating hours
                               decreased. In other words, the increase in energy consumption in one of the fuel type
                               (often electricity - for electrification upgrades) didn't result in improvement of
                               cooling/heating umnet hours.  
                **true-ok-chng:** Adjustment of ok-chng after using true-bad-chng instead of bad-chng  
                **null**: Included for testing/integrity-checking purpose. It refers to number of buildings that are  
                      are neither no-chng, not bad-chng nor ok-chng. It should always be zero.  
                **any**: Sum of the no-chng + bad-chng + ok-chng. Refers to any change (including no-change).  
                **x-chng %**: The percentage form of the change calculated by using success count as the base.  
        """  # noqa: W291

        baseline_result = self._get_bs_success_report()

        if self._bsq.up_table is None:
            return baseline_result

        if trim_missing_bs == 'auto':
            if 'success' in baseline_result:
                trim = True
            else:
                logger.warning("None of the simulation was successful in baseline. The counts for upgrade will be"
                               " returned without requiring corresponding successful baseline run.")
                trim = False
        elif isinstance(trim_missing_bs, bool):
            trim = trim_missing_bs
        else:
            assert_never(trim_missing_bs)
            raise ValueError("trim_missing_bs must be either True/False or 'auto'.")

        if get_query_only:
            baseline_query = self._get_bs_success_report(get_query_only=True)
            upgrade_query = self._get_up_success_report(trim_missing_bs=trim, get_query_only=True)
            change_query = self._get_change_report(get_query_only=True)
            return baseline_query, upgrade_query, change_query

        upgrade_result = self._get_up_success_report(trim_missing_bs=trim).fillna(0)
        change_result = self._get_change_report().fillna(0)
        if get_query_only:
            return baseline_result, upgrade_result, change_result
        if 'success' in upgrade_result.columns:
            pa = round(100 * (upgrade_result['fail'] + upgrade_result['success']) /
                       upgrade_result['Sum'], 1)
            upgrade_result['Applied %'] = pa

        pf = pd.concat([baseline_result, upgrade_result])
        pf = pf.join(change_result).fillna(0)
        pf['no-chng %'] = round(100 * pf['no-chng'] / pf['success'], 1)
        pf['bad-chng %'] = round(100 * pf['bad-chng'] / pf['success'], 1)
        pf['ok-chng %'] = round(100 * pf['ok-chng'] / pf['success'], 1)
        pf['true-ok-chng %'] = round(100 * pf['true-ok-chng'] / pf['success'], 1)
        pf['true-bad-chng %'] = round(100 * pf['true-bad-chng'] / pf['success'], 1)
        return pf

    @typing.overload
    def _get_ts_report(self, get_query_only: Literal[False] = False) -> DataFrame:
        ...

    @typing.overload
    def _get_ts_report(self, get_query_only: Literal[True]) -> str:
        ...

    @typing.overload
    def _get_ts_report(self, get_query_only: bool) -> Union[DataFrame, str]:
        ...

    def _get_ts_report(self, get_query_only: bool = False):
        if self._bsq.ts_table is None:
            raise ValueError("No upgrade table is available .")

        ts_query = sa.select([self._bsq.ts_table.c['upgrade'],
                              safunc.count(self._bsq.ts_bldgid_column.distinct()).label("count")])
        ts_query = ts_query.group_by(sa.text('1'))
        ts_query = ts_query.order_by(sa.text('1'))
        if get_query_only:
            return self._bsq._compile(ts_query)
        df = self._bsq.execute(ts_query)
        df['upgrade'] = df['upgrade'].map(int)
        df = df.set_index('upgrade')
        df = df.rename(columns={'count': 'success'})
        return df

    def check_ts_bs_integrity(self) -> bool:
        """Checks the integrity between the timeseries and baseline (metadata) tables.

        Returns:
            bool: Whether or not the integrity check passed.
        """
        logger.info("Checking integrity with ts_tables ...")
        raw_ts_report = self._get_ts_report()
        raw_success_report = self.get_success_report(trim_missing_bs=False)
        if self._bsq.db_schema.structure.unapplicables_have_ts:
            bs_dict = raw_success_report[['unapplicable', 'success']].sum(axis=1).to_dict()
        else:
            bs_dict = raw_success_report['success'].to_dict()
        ts_dict = raw_ts_report.to_dict()['success']
        check_pass = True
        for upgrade, count in ts_dict.items():
            if count != bs_dict.get(upgrade, 0):
                print_r(f"Upgrade {upgrade} has {count} samples in timeseries table, but {bs_dict.get(upgrade, 0)}"
                        " samples in baseline/upgrade table.")
                check_pass = False
        if check_pass:
            print_g("Annual and timeseries tables are verified to have the same number of buildings.")
        try:
            rowcount = self._bsq._get_rows_per_building()
            print_g(f"All buildings are verified to have the same number of ({rowcount}) timeseries rows.")
        except ValueError:
            check_pass = False
            print_r("Different buildings have different number of timeseries rows.")
        return check_pass

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_successful_simulation_count(self, *,
                                        restrict: Sequence[tuple[AnyColType,
                                                                 Union[str, int, Sequence[Union[int, str]]]]] =
                                        Field(default_factory=list),
                                        get_query_only: bool = False):
        """
        Returns the count of successful simulation for the given restric condition in the baseline.
        Args:
            restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple.
                      Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
            get_query_only: If set to true, returns the list of queries to run instead of the result.

        Returns:
            Pandas integer counting the number of successful simulation
        """
        query = sa.select(safunc.count().label("count"))

        restrict = list(restrict) if restrict else []
        restrict.insert(0, (self._bsq.db_schema.column_names.completed_status,
                            [self._bsq.db_schema.completion_values.success]))
        query = self._bsq._add_restrict(query, restrict, bs_only=True)
        if get_query_only:
            return self._bsq._compile(query)

        return self._bsq.execute(query)

    @typing.overload
    def get_applied_options(self, *, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: Literal[True]) -> list[dict[str, str]]:
        ...

    @typing.overload
    def get_applied_options(self, *, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: Literal[False] = False) -> list[set[str]]:
        ...

    @typing.overload
    def get_applied_options(self, *, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: bool) -> list[Union[dict[str, str], set[str]]]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True))
    def get_applied_options(self, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: bool = False):
        """Returns the list of options applied to each buildings for a given upgrade.

        Args:
            upgrade_id (int | str): The upgrade for which to find the applied options.
            bldg_ids (list[int]): List of building ids.
            include_base_opt (bool, optional): If baseline value is to be included. Defaults to False.

        Returns:
            list[set|dict]: List of options (along with baseline chars, if include_base_opt is true)
        """
        up_csv = self._bsq.get_upgrades_csv_full(upgrade_id=int(upgrade_id))
        rel_up_csv = up_csv.loc[bldg_ids]
        upgrade_cols = [key for key in up_csv.columns
                        if key.startswith("upgrade_costs.option_") and key.endswith("_name")]

        if include_base_opt:
            base_csv = self._bsq.get_results_csv_full()
            rel_base_csv = base_csv.loc[bldg_ids]
            rel_base_csv = rel_base_csv.rename(columns=lambda c: c.split('.')[1] if '.' in c else c)
            char_df = rel_up_csv[upgrade_cols].fillna('').agg(
                lambda x: {'_'.join(v.split('|')[0].lower().split()) for v in x if v}, axis=1)
            all_chars = [c for c in reduce(set.union, char_df.values) if c in set(rel_base_csv.columns)]
            char_dict: dict[Hashable, dict[str, str]] = rel_base_csv[all_chars].to_dict(orient='index')

            def add_base_chars(options: list):
                bldg_id = options[0]  # first entry is building_id
                return {opt: char_dict[bldg_id].get('_'.join(opt.split('|')[0].lower().split()), '')
                        for opt in options[1:]}
            opt_df: pd.Series = rel_up_csv[upgrade_cols].fillna('').reset_index().agg(lambda x: [v for v in x if v],
                                                                                      axis=1)
            return_val = opt_df.map(add_base_chars).to_list()
        else:
            return_val = rel_up_csv[upgrade_cols].fillna('').agg(lambda x: {v for v in x if v},
                                                                 axis=1).to_list()

        return return_val

    @validate_arguments(config=dict(arbitrary_types_allowed=True))
    def get_enduses_buildings_map_by_change(self, upgrade_id: Union[str, int],
                                            change_type: str = 'changed',
                                            bldg_list: Optional[list[int]] = None) -> dict[str, pd.Index]:
        """Finds the list of enduses and the buildings that had change in the enduses for a given change type.
        Args:
            upgrade (int | stsr): The upgrade to look at.
            change_type (str, optional): The kind of change to look for. Valid values are increased, decreased and
                                         and changed. Defaults to 'changed' which includes both cases.
            bldg_list (list[int], optional): The list of buildings to narrow down to. If omitted, searches through all
                                             all the buildings in the upgrade. Defaults to None.

        Returns:
            dict[str, pd.Index]: Dict mapping enduses that had a given change and building ids showing that change.
        """
        up_csv = self._bsq.get_upgrades_csv_full(upgrade_id=int(upgrade_id))
        bs_csv = self._bsq.get_results_csv_full()
        if bldg_list:
            up_csv = up_csv.loc[bldg_list]
            bs_csv = bs_csv.loc[bldg_list]

        def clean_column(col: str):
            col = col.removeprefix(self._bsq._out_prefix)
            col = col.removeprefix("end_use_")
            col = col.removeprefix("fuel_use_")
            return col

        def get_pure_enduse(col):
            for fuel in FUELS:
                col = col.removeprefix(f"{fuel}_")
            return col

        end_use_cols = [c for c in up_csv.columns if ('end_use' in c) or ('fuel_use' in c) or ('unmet_hours_' in c)]
        up_csv = up_csv[end_use_cols].rename(columns=clean_column)
        bs_csv = bs_csv[end_use_cols].rename(columns=clean_column)

        pure_enduses = {get_pure_enduse(c) for c in up_csv.columns}

        def get_all_fuel_enduses(df, end_use):
            return [col for col in df.columns if col.endswith(end_use)]

        def add_all_fuel_cols(df):
            for end_use in pure_enduses:
                df[f"all_fuel_{end_use}"] = df[get_all_fuel_enduses(df, end_use)].sum(axis=1)
            return df

        add_all_fuel_cols(up_csv)
        add_all_fuel_cols(bs_csv)

        diff = up_csv - bs_csv
        enduses_df = diff.transpose()
        if change_type == 'decreased':
            enduses_df = enduses_df < -1e-12
        elif change_type == 'increased':
            enduses_df = enduses_df > 1e-12
        else:
            enduses_df = enduses_df.abs() > 1e-12
        change_dict = enduses_df.apply(lambda x: enduses_df.columns[x], axis=1).to_dict()
        clean_dict = {key: value for key, value in change_dict.items() if len(value) > 0}
        return clean_dict

Classes

class BuildStockReport (bsq: main.BuildStockQuery)

Class with a collection of functions for reporting and integrity check queries.

Expand source code
class BuildStockReport:
    """Class with a collection of functions for reporting and integrity check queries.
    """

    def __init__(self, bsq: 'main.BuildStockQuery') -> None:
        self._bsq = bsq

    def _rename_completed_status_column(self, df: DataFrame) -> DataFrame:
        df = df.rename(columns={self._bsq.db_schema.column_names.completed_status: "completed_status"})
        rev_value_map = {db_val: normal_val for normal_val, db_val in self._bsq.db_schema.completion_values}
        df["completed_status"] = df["completed_status"].map(rev_value_map)
        return df

    @typing.overload
    def _get_bs_success_report(self, get_query_only: Literal[False] = False) -> DataFrame:
        ...

    @typing.overload
    def _get_bs_success_report(self, get_query_only: Literal[True]) -> str:
        ...

    @typing.overload
    def _get_bs_success_report(self, get_query_only: bool) -> Union[DataFrame, str]:
        ...

    def _get_bs_success_report(self, get_query_only: bool = False):
        bs_query = sa.select([self._bsq._bs_completed_status_col, safunc.count().label("count")])
        bs_query = bs_query.group_by(sa.text('1'))
        if get_query_only:
            return self._bsq._compile(bs_query)
        df = self._bsq.execute(bs_query)
        df = self._rename_completed_status_column(df)
        df.insert(0, 'upgrade', 0)
        return self._process_report(df)

    @typing.overload
    def _get_change_report(self, get_query_only: Literal[False] = False) -> DataFrame:
        ...

    @typing.overload
    def _get_change_report(self, get_query_only: Literal[True]) -> list[str]:
        ...

    @typing.overload
    def _get_change_report(self, get_query_only: bool) -> Union[DataFrame, list[str]]:
        ...

    def _get_change_report(self, get_query_only: bool = False):
        """Returns counts of buildings to which upgrade didn't do any changes on energy consumption

        Args:
            get_query_only (bool, optional): _description_. Defaults to False.
        """
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")

        queries: list[str] = []
        chng_types = ["no-chng", "bad-chng", "ok-chng", "true-bad-chng", "true-ok-chng", "null", "any"]
        for ch_type in chng_types:
            up_query = sa.select([self._bsq.up_table.c['upgrade'], safunc.count().label("change")])
            up_query = up_query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            conditions = self._get_change_conditions(change_type=ch_type)
            up_query = up_query.where(sa.and_(self._bsq._bs_successful_condition,
                                              self._bsq._up_successful_condition,
                                              conditions))  # type: ignore
            up_query = up_query.group_by(sa.text('1'))
            up_query = up_query.order_by(sa.text('1'))
            queries.append(self._bsq._compile(up_query))
        if get_query_only:
            return queries
        change_df: DataFrame = pd.DataFrame()
        for chng_type, query in zip(chng_types, queries):
            df = self._bsq.execute(query)
            df.rename(columns={"change": chng_type}, inplace=True)
            df['upgrade'] = df['upgrade'].map(int)
            df = df.set_index('upgrade').sort_index()
            change_df = change_df.join(df, how='outer') if len(change_df) > 0 else df
        change_df = change_df.fillna(0)
        for chng_type in chng_types:
            if chng_type not in change_df.columns:
                change_df[chng_type] = 0
        return change_df

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def print_change_details(self, upgrade_id: int, yml_file: str, opt_sat_path: str,
                             change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                  "true-ok-chng", "null", "any"] = 'no-chng'):
        ua = self._bsq.get_upgrades_analyzer(yml_file, opt_sat_path)
        bad_bids = self.get_buildings_by_change(upgrade_id=upgrade_id, change_type=change_type)
        good_bids = self.get_buildings_by_change(upgrade_id=upgrade_id, change_type='ok-chng')
        ua.print_unique_characteristic(upgrade_id, change_type, good_bids, bad_bids)

    @typing.overload
    def _get_upgrade_buildings(self, *, upgrade_id: int, trim_missing_bs: bool = True,
                               get_query_only: Literal[False] = False) -> list[int]:
        ...

    @typing.overload
    def _get_upgrade_buildings(self, *, upgrade_id: int, get_query_only: Literal[True],
                               trim_missing_bs: bool = True) -> str:
        ...

    @typing.overload
    def _get_upgrade_buildings(self, *, upgrade_id: int, get_query_only: bool,
                               trim_missing_bs: bool = True) -> Union[list[int], str]:
        ...

    def _get_upgrade_buildings(self, *, upgrade_id: int, trim_missing_bs: bool = True, get_query_only: bool = False):
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        up_query = sa.select([self._bsq.up_bldgid_column])
        if trim_missing_bs:
            up_query = up_query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            up_query = up_query.where(sa.and_(self._bsq._bs_successful_condition,
                                              self._bsq._up_successful_condition,
                                              self._bsq.up_table.c['upgrade'] == str(upgrade_id),
                                              ))
        else:
            up_query = up_query.where(sa.and_(self._bsq.up_table.c['upgrade'] == str(upgrade_id),
                                              self._bsq._up_successful_condition))
        if get_query_only:
            return self._bsq._compile(up_query)
        df = self._bsq.execute(up_query)
        return df[self._bsq.bs_bldgid_column.name].to_numpy(dtype='int32').tolist()

    def _get_change_conditions(self, change_type: str):
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")

        threshold = 1e-3
        fuel_cols = list(c for c in self._bsq.db_schema.column_names.fuel_totals
                         if c in self._bsq.up_table.columns)  # Look at all fuel type totals
        all_cols = list(fuel_cols)
        if self._bsq.db_schema.column_names.unmet_hours_cooling_hr:
            all_cols += [self._bsq.db_schema.column_names.unmet_hours_cooling_hr]
        if self._bsq.db_schema.column_names.unmet_hours_heating_hr:
            all_cols += [self._bsq.db_schema.column_names.unmet_hours_heating_hr]
        null_chng_conditions = sa.and_(*[sa.or_(self._bsq.up_table.c[col] == sa.null(),
                                                self._bsq.bs_table.c[col] == sa.null()
                                                ) for col in fuel_cols])

        no_chng_conditions = sa.and_(*[safunc.coalesce(safunc.abs(self._bsq.up_table.c[col] -
                                                                  self._bsq.bs_table.c[col]), 0) < threshold
                                       for col in fuel_cols])
        good_chng_conditions = sa.or_(
            *[self._bsq.bs_table.c[col] - self._bsq.up_table.c[col] >= threshold for col in fuel_cols])
        opp_chng_conditions = sa.and_(*[safunc.coalesce(self._bsq.bs_table.c[col] - self._bsq.up_table.c[col], -1) <
                                        threshold for col in fuel_cols], sa.not_(no_chng_conditions))
        true_good_chng_conditions = sa.or_(*[self._bsq.bs_table.c[col] - self._bsq.up_table.c[col] >= threshold
                                             for col in all_cols])
        true_opp_chng_conditions = sa.and_(*[safunc.coalesce(self._bsq.bs_table.c[col] - self._bsq.up_table.c[col], -1)
                                             < threshold for col in all_cols], sa.not_(no_chng_conditions))
        if change_type == 'no-chng':
            conditions = no_chng_conditions
        elif change_type == 'bad-chng':
            conditions = opp_chng_conditions
        elif change_type == 'true-bad-chng':
            conditions = true_opp_chng_conditions
        elif change_type == 'ok-chng':
            conditions = good_chng_conditions
        elif change_type == 'true-ok-chng':
            conditions = true_good_chng_conditions
        elif change_type == 'null':
            conditions = null_chng_conditions
        elif change_type == 'any':
            conditions = sa.true
        else:
            raise ValueError(f"Invalid {change_type=}")
        return conditions

    @typing.overload
    def get_buildings_by_change(self, *, upgrade_id: int, get_query_only: Literal[True],
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng'
                                ) -> str:
        ...

    @typing.overload
    def get_buildings_by_change(self, *,  upgrade_id: int, get_query_only: Literal[False] = False,
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng'
                                ) -> list[int]:
        ...

    @typing.overload
    def get_buildings_by_change(self, *, upgrade_id: int, get_query_only: bool,
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng'
                                ) -> Union[list[int], str]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_buildings_by_change(self, *, upgrade_id: int,
                                change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                     "true-ok-chng", "null", "any"] = 'no-chng',
                                get_query_only: bool = False):

        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        up_query = sa.select([self._bsq.bs_bldgid_column, self._bsq._bs_completed_status_col,
                              self._bsq._up_completed_status_col])
        up_query = up_query.join(self._bsq.up_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)

        conditions = self._get_change_conditions(change_type)
        up_query = up_query.where(sa.and_(self._bsq._bs_successful_condition,
                                          self._bsq._up_successful_condition,
                                          self._bsq.up_table.c['upgrade'] == str(upgrade_id),
                                          conditions))  # type: ignore
        if get_query_only:
            return self._bsq._compile(up_query)
        df = self._bsq.execute(up_query)
        return df[self._bsq.bs_bldgid_column.name].to_numpy(dtype='int32').tolist()

    @typing.overload
    def _get_up_success_report(self, *, get_query_only: Literal[True],
                               trim_missing_bs: bool = True) -> str:
        ...

    @typing.overload
    def _get_up_success_report(self, *, get_query_only: Literal[False] = False,
                               trim_missing_bs: bool = True) -> pd.DataFrame:
        ...

    @typing.overload
    def _get_up_success_report(self, *, get_query_only: bool,
                               trim_missing_bs: bool = True) -> Union[pd.DataFrame, str]:
        ...

    def _get_up_success_report(self, *, trim_missing_bs: bool = True, get_query_only: bool = False):
        """Get success report for upgrades

        Args:
            trim_missing_bs (bool, optional): Ignore buildings that have no successful runs in the baseline.
                Defaults to True.
            get_query_only (bool, optional): Returns query only without the result. Defaults to False.

        Returns:
            Union[str, pd.DataFrame]: If get_query_only then returns the query string. Otherwise returns the dataframe.
        """
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        up_query = sa.select([self._bsq.up_table.c['upgrade'], self._bsq._up_completed_status_col,
                              safunc.count().label("count")])
        if trim_missing_bs:
            up_query = up_query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            up_query = up_query.where(self._bsq._bs_successful_condition)

        up_query = up_query.group_by(sa.text('1'), sa.text('2'))
        up_query = up_query.order_by(sa.text('1'), sa.text('2'))
        if get_query_only:
            return self._bsq._compile(up_query)
        df = self._bsq.execute(up_query)
        df = self._rename_completed_status_column(df)
        return self._process_report(df)

    def _process_report(self, df: DataFrame):
        df['upgrade'] = df['upgrade'].map(int)
        pf = df.pivot(index=['upgrade'], columns=['completed_status'],
                      values=['count'])
        pf.columns = [c[1] for c in pf.columns]
        pf['Sum'] = pf.sum(axis=1)
        for col in ['fail', 'unapplicable']:
            if col not in pf.columns:
                pf.insert(1, col, 0)
        return pf

    @typing.overload
    def _get_full_options_report(self, *, trim_missing_bs: bool, get_query_only: Literal[True]) -> str:
        ...

    @typing.overload
    def _get_full_options_report(self, *, trim_missing_bs: bool,
                                 get_query_only: Literal[False] = False) -> pd.DataFrame:
        ...

    @typing.overload
    def _get_full_options_report(self, *, trim_missing_bs: bool, get_query_only: bool) -> Union[pd.DataFrame, str]:
        ...

    def _get_full_options_report(self, trim_missing_bs: bool = True, get_query_only: bool = False):
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")
        opt_name_cols = [c for c in self._bsq.up_table.columns if c.name.startswith("upgrade_costs.option_")
                         and c.name.endswith("name")]
        query = sa.select([self._bsq.up_table.c['upgrade']] + opt_name_cols + [safunc.count().label('success')]
                          + [safunc.array_agg(self._bsq.up_bldgid_column)])
        if trim_missing_bs:
            query = query.join(self._bsq.bs_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)
            query = query.where(self._bsq._bs_successful_condition)
        grouping_texts = [sa.text(str(i+1)) for i in range(1+len(opt_name_cols))]
        query = query.group_by(*grouping_texts)
        query = query.order_by(*grouping_texts)
        if get_query_only:
            return self._bsq._compile(query)
        df = self._bsq.execute(query)
        simple_names = [f"option{i+1}" for i in range(len(opt_name_cols))]
        df.columns = ['upgrade'] + simple_names + ['success', "applied_buildings"]
        df['upgrade'] = df['upgrade'].map(int)
        df['applied_buildings'] = df['applied_buildings'].map(lambda x: literal_eval(x))
        applied_rows = df[simple_names].any(axis=1)  # select only rows with at least one option applied
        return df[applied_rows]

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_options_report(self, trim_missing_bs: bool = True) -> pd.DataFrame:
        """Finds out the number and list of buildings each of the options applied to.

        Args:
            trim_missing_bs (bool, optional): Whether the buildings that are not available in basline should be dropped.
                                              Defaults to True.
        Returns:
            pd.DataFrame: The list of options the corresponding set of building ids the option applied to.
        """
        if self._bsq.up_table is None:
            raise ValueError("No upgrade table is available .")

        full_report = self._get_full_options_report(trim_missing_bs=trim_missing_bs)
        option_cols = [c for c in full_report.columns if c.startswith("option")]
        total_counts: Counter = Counter()
        bldg_array: dict = defaultdict(list)
        for option in option_cols:
            grouped_dict = full_report.groupby(['upgrade', option]).aggregate({'success': 'sum',
                                                                               'applied_buildings': 'sum'}).to_dict()
            total_counts += Counter(grouped_dict['success'])
            for key, val in grouped_dict['applied_buildings'].items():
                bldg_array[key] += val

        option_df = pd.DataFrame.from_dict({'success': total_counts,
                                            'applied_buildings': bldg_array,
                                            }, orient='columns')
        option_df['applied_buildings'] = option_df['applied_buildings'].map(lambda x: set(x))
        option_df = option_df.reset_index()
        option_df.columns = ['upgrade', 'option', 'success', 'applied_buildings']
        # Aggregate for upgrade
        agg = option_df.groupby('upgrade').aggregate({'applied_buildings': lambda x: reduce(set.union, x)})
        agg = agg.reset_index()
        agg.insert(1, 'success', agg['applied_buildings'].map(lambda x: len(x)))
        agg.insert(0, 'option', 'All')
        full_df = pd.concat([option_df, agg])
        full_df = full_df.sort_values(['upgrade', 'option'])
        return full_df

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_option_integrity_report(self, yaml_file: str, opt_sat_path: str) -> pd.DataFrame:
        """Checks the upgrade/option spec in the buildstock configuration file against what is actually in the
        simulation result and tabulates the discrepancy.
        Args:
            yaml_file (str): The path to buildstock configuration file used to run the simulation

        Returns:
            pd.DataFrame: The report dataframe.
        """
        ua_df = self._bsq.get_upgrades_analyzer(yaml_file, opt_sat_path).get_report()
        ua_df = ua_df.groupby(['upgrade', 'option']).aggregate({'applicable_to': 'sum',
                                                                'applicable_buildings': lambda x: reduce(set.union, x)})
        assert (ua_df['applicable_to'] == ua_df['applicable_buildings'].map(lambda x: len(x))).all()
        opt_report_df = self.get_options_report().fillna(0)
        opt_report_df = opt_report_df.set_index(['upgrade', 'option'])
        diff_df = pd.DataFrame(index=ua_df.index)
        diff_df['applicable_buildings'] = ua_df['applicable_buildings']
        diff_df['applied_buildings'] = opt_report_df['applied_buildings']
        diff_df['overapplied_bldgs'] = opt_report_df['applied_buildings'] - ua_df['applicable_buildings']
        diff_df['unapplied_bldgs'] = ua_df['applicable_buildings'] - opt_report_df['applied_buildings']
        for col in diff_df.columns:
            diff_df[f"{col}_count"] = diff_df[col].map(lambda x: len(x) if isinstance(x, set) else 0)
        success_report_df = self.get_success_report()
        fail_report = success_report_df[['fail', 'success']].rename(columns={'fail': "Upgrade Failures",
                                                                             'success': "Upgrade Success"})
        diff_df = diff_df.join(fail_report)
        return diff_df

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def check_options_integrity(self, yaml_file: str, opt_sat_path: str) -> bool:
        """ Checks the upgrade/option spec in the buildstock configuration file against what is actually in the
        simulation result and flags any discrepancy. The verificationa allows for some mismatch since some simulations
        could have failed. Unless there is a bug somewhere in buildstock workflow, integrity check should pass
        regardless of number of failures.

        Args:
            yaml_file (str): The path to buildstock configuration file used to run the simulation

        Returns:
            bool: Whether or not the integrity check passed.
        """
        intg_df = self.get_option_integrity_report(yaml_file, opt_sat_path).reset_index()
        all_intg_df = intg_df[intg_df['option'] == 'All']
        blank_opt_upgrades = all_intg_df[all_intg_df['applied_buildings_count'] < all_intg_df['Upgrade Success']]
        assert (all_intg_df['applied_buildings_count'] >= all_intg_df['Upgrade Success']).all()
        if len(blank_opt_upgrades) > 0:
            print_r("BLANK OPTIONS: The following upgrades have fewere 'applied_buildings_count' than 'Upgrade Success'"
                    "This indicates that some buildings in these upgrades didn't have any option applied")
        serious = False
        for indx, row in intg_df.iterrows():
            upgrade_failures = row['Upgrade Failures']
            applicable_count = row.applicable_buildings_count
            applied_count = row.applied_buildings_count
            unapplied_count = row.unapplied_bldgs_count
            overapplied_count = row.overapplied_bldgs_count
            if row.unapplied_bldgs_count > 0:
                if row.option == 'All' and row.unapplied_bldgs_count == upgrade_failures:
                    print_r(
                        f"OPTION UNDERAPPLICATION: Upgrade {row.upgrade} was was supposed to be applied to "
                        f"{applicable_count} samples but applied to {applied_count} samples")
                    print_g(f"This difference of {unapplied_count} exactly matches with {upgrade_failures}"
                            f" failures in Upgrade {row.upgrade}. It's all good.")
                else:
                    print_r(f"OPTION UNDERAPPLICATION: Upgrade {row.upgrade}, {row.option} didn't apply to "
                            f"{unapplied_count} samples that it was supposed to apply to.")
                    if upgrade_failures > 0:
                        if unapplied_count > upgrade_failures:
                            print_r(f"{upgrade_failures} failures in Upgrade {row.upgrade} can't account for this.")
                            serious = True
                        else:
                            print_g(f"{upgrade_failures} failures in Upgrade {row.upgrade} may account for this.")
                    else:
                        serious = True

            if overapplied_count > 0:
                print_r(f"OPTION OVERAPPLICATION: Upgrade {row.upgrade}, {row.option} applied to"
                        f" {unapplied_count} samples that it was supposed to NOT apply to.")
                serious = True
        if not serious:
            print_g("Integrity check passed.")
            return True
        else:
            print_r("Integrity check failed. Please check the serious issues above.")
            return False

    @typing.overload
    def get_success_report(self, *, get_query_only: Literal[True],
                           trim_missing_bs: Union[Literal['auto'], bool] = 'auto') -> tuple[str, str, list[str]]:
        ...

    @typing.overload
    def get_success_report(self, *, get_query_only: Literal[False] = False,
                           trim_missing_bs: Union[Literal['auto'], bool] = 'auto') -> pd.DataFrame:
        ...

    @typing.overload
    def get_success_report(self, *, get_query_only: bool,
                           trim_missing_bs: Union[Literal['auto'], bool] = 'auto'
                           ) -> Union[pd.DataFrame, tuple[str, str, list[str]]]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_success_report(self, trim_missing_bs: Union[Literal['auto'], bool] = 'auto',
                           get_query_only: bool = False):
        """Returns a basic report showing number of success and failures for each upgrade along with percentage.
        Additional information regarding number of buildings to which the upgrade applied and whether the enduses
        changed is also returned.

        Args:
            trim_missing_bs (str | bool, optional): Whether the buildings that failed in baseline should be
                                                    dropped from the upgrades. If true, all metrics is calculated after
                                                    those buildings are dropped from the upgrades. Defaults to 'auto'.
            get_query_only (bool, optional): If true, returns SQL query instead of the report. Defaults to False.

        Raises:
            ValueError: If something went wrong.

        Returns:
            pd.DateFrame: The report dataframe. The meaning of the various columns are as follows:
                **Fail**: Number of simulation that failed.  
                **unapplicable**: Number of buildings to which the upgrade didn't apply (because of apply logic)  
                **Success**: The number of buildings which completed simulation successfully. No simulation is run for
                    unapplicable buildings.  
                **Sum**: Sum of the first three columns.  
                **Applied %**: Success / Sum * 100 %.  
                **no-chng** : Number of successful simulation that didn't have any change in values for any enduses.  
                **bad-chng:** Number of successful simulation that had bad changes. It's considered a bad change if
                   none of the fuel has any reduction in energy consumption, and at least one fuel has an
                   increase in energy consumption.  
                **ok-chng:** |set(success) - set(no-chng) - set(bad-chng)| i.e. count of successful simulation that are
                         neither no-chng nor bad-chng.  
                **true-bad-chng:** Count of only those bad changes in which neither of the umnet cooling/heating hours
                               decreased. In other words, the increase in energy consumption in one of the fuel type
                               (often electricity - for electrification upgrades) didn't result in improvement of
                               cooling/heating umnet hours.  
                **true-ok-chng:** Adjustment of ok-chng after using true-bad-chng instead of bad-chng  
                **null**: Included for testing/integrity-checking purpose. It refers to number of buildings that are  
                      are neither no-chng, not bad-chng nor ok-chng. It should always be zero.  
                **any**: Sum of the no-chng + bad-chng + ok-chng. Refers to any change (including no-change).  
                **x-chng %**: The percentage form of the change calculated by using success count as the base.  
        """  # noqa: W291

        baseline_result = self._get_bs_success_report()

        if self._bsq.up_table is None:
            return baseline_result

        if trim_missing_bs == 'auto':
            if 'success' in baseline_result:
                trim = True
            else:
                logger.warning("None of the simulation was successful in baseline. The counts for upgrade will be"
                               " returned without requiring corresponding successful baseline run.")
                trim = False
        elif isinstance(trim_missing_bs, bool):
            trim = trim_missing_bs
        else:
            assert_never(trim_missing_bs)
            raise ValueError("trim_missing_bs must be either True/False or 'auto'.")

        if get_query_only:
            baseline_query = self._get_bs_success_report(get_query_only=True)
            upgrade_query = self._get_up_success_report(trim_missing_bs=trim, get_query_only=True)
            change_query = self._get_change_report(get_query_only=True)
            return baseline_query, upgrade_query, change_query

        upgrade_result = self._get_up_success_report(trim_missing_bs=trim).fillna(0)
        change_result = self._get_change_report().fillna(0)
        if get_query_only:
            return baseline_result, upgrade_result, change_result
        if 'success' in upgrade_result.columns:
            pa = round(100 * (upgrade_result['fail'] + upgrade_result['success']) /
                       upgrade_result['Sum'], 1)
            upgrade_result['Applied %'] = pa

        pf = pd.concat([baseline_result, upgrade_result])
        pf = pf.join(change_result).fillna(0)
        pf['no-chng %'] = round(100 * pf['no-chng'] / pf['success'], 1)
        pf['bad-chng %'] = round(100 * pf['bad-chng'] / pf['success'], 1)
        pf['ok-chng %'] = round(100 * pf['ok-chng'] / pf['success'], 1)
        pf['true-ok-chng %'] = round(100 * pf['true-ok-chng'] / pf['success'], 1)
        pf['true-bad-chng %'] = round(100 * pf['true-bad-chng'] / pf['success'], 1)
        return pf

    @typing.overload
    def _get_ts_report(self, get_query_only: Literal[False] = False) -> DataFrame:
        ...

    @typing.overload
    def _get_ts_report(self, get_query_only: Literal[True]) -> str:
        ...

    @typing.overload
    def _get_ts_report(self, get_query_only: bool) -> Union[DataFrame, str]:
        ...

    def _get_ts_report(self, get_query_only: bool = False):
        if self._bsq.ts_table is None:
            raise ValueError("No upgrade table is available .")

        ts_query = sa.select([self._bsq.ts_table.c['upgrade'],
                              safunc.count(self._bsq.ts_bldgid_column.distinct()).label("count")])
        ts_query = ts_query.group_by(sa.text('1'))
        ts_query = ts_query.order_by(sa.text('1'))
        if get_query_only:
            return self._bsq._compile(ts_query)
        df = self._bsq.execute(ts_query)
        df['upgrade'] = df['upgrade'].map(int)
        df = df.set_index('upgrade')
        df = df.rename(columns={'count': 'success'})
        return df

    def check_ts_bs_integrity(self) -> bool:
        """Checks the integrity between the timeseries and baseline (metadata) tables.

        Returns:
            bool: Whether or not the integrity check passed.
        """
        logger.info("Checking integrity with ts_tables ...")
        raw_ts_report = self._get_ts_report()
        raw_success_report = self.get_success_report(trim_missing_bs=False)
        if self._bsq.db_schema.structure.unapplicables_have_ts:
            bs_dict = raw_success_report[['unapplicable', 'success']].sum(axis=1).to_dict()
        else:
            bs_dict = raw_success_report['success'].to_dict()
        ts_dict = raw_ts_report.to_dict()['success']
        check_pass = True
        for upgrade, count in ts_dict.items():
            if count != bs_dict.get(upgrade, 0):
                print_r(f"Upgrade {upgrade} has {count} samples in timeseries table, but {bs_dict.get(upgrade, 0)}"
                        " samples in baseline/upgrade table.")
                check_pass = False
        if check_pass:
            print_g("Annual and timeseries tables are verified to have the same number of buildings.")
        try:
            rowcount = self._bsq._get_rows_per_building()
            print_g(f"All buildings are verified to have the same number of ({rowcount}) timeseries rows.")
        except ValueError:
            check_pass = False
            print_r("Different buildings have different number of timeseries rows.")
        return check_pass

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_successful_simulation_count(self, *,
                                        restrict: Sequence[tuple[AnyColType,
                                                                 Union[str, int, Sequence[Union[int, str]]]]] =
                                        Field(default_factory=list),
                                        get_query_only: bool = False):
        """
        Returns the count of successful simulation for the given restric condition in the baseline.
        Args:
            restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple.
                      Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
            get_query_only: If set to true, returns the list of queries to run instead of the result.

        Returns:
            Pandas integer counting the number of successful simulation
        """
        query = sa.select(safunc.count().label("count"))

        restrict = list(restrict) if restrict else []
        restrict.insert(0, (self._bsq.db_schema.column_names.completed_status,
                            [self._bsq.db_schema.completion_values.success]))
        query = self._bsq._add_restrict(query, restrict, bs_only=True)
        if get_query_only:
            return self._bsq._compile(query)

        return self._bsq.execute(query)

    @typing.overload
    def get_applied_options(self, *, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: Literal[True]) -> list[dict[str, str]]:
        ...

    @typing.overload
    def get_applied_options(self, *, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: Literal[False] = False) -> list[set[str]]:
        ...

    @typing.overload
    def get_applied_options(self, *, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: bool) -> list[Union[dict[str, str], set[str]]]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True))
    def get_applied_options(self, upgrade_id: Union[str, int], bldg_ids: list[int],
                            include_base_opt: bool = False):
        """Returns the list of options applied to each buildings for a given upgrade.

        Args:
            upgrade_id (int | str): The upgrade for which to find the applied options.
            bldg_ids (list[int]): List of building ids.
            include_base_opt (bool, optional): If baseline value is to be included. Defaults to False.

        Returns:
            list[set|dict]: List of options (along with baseline chars, if include_base_opt is true)
        """
        up_csv = self._bsq.get_upgrades_csv_full(upgrade_id=int(upgrade_id))
        rel_up_csv = up_csv.loc[bldg_ids]
        upgrade_cols = [key for key in up_csv.columns
                        if key.startswith("upgrade_costs.option_") and key.endswith("_name")]

        if include_base_opt:
            base_csv = self._bsq.get_results_csv_full()
            rel_base_csv = base_csv.loc[bldg_ids]
            rel_base_csv = rel_base_csv.rename(columns=lambda c: c.split('.')[1] if '.' in c else c)
            char_df = rel_up_csv[upgrade_cols].fillna('').agg(
                lambda x: {'_'.join(v.split('|')[0].lower().split()) for v in x if v}, axis=1)
            all_chars = [c for c in reduce(set.union, char_df.values) if c in set(rel_base_csv.columns)]
            char_dict: dict[Hashable, dict[str, str]] = rel_base_csv[all_chars].to_dict(orient='index')

            def add_base_chars(options: list):
                bldg_id = options[0]  # first entry is building_id
                return {opt: char_dict[bldg_id].get('_'.join(opt.split('|')[0].lower().split()), '')
                        for opt in options[1:]}
            opt_df: pd.Series = rel_up_csv[upgrade_cols].fillna('').reset_index().agg(lambda x: [v for v in x if v],
                                                                                      axis=1)
            return_val = opt_df.map(add_base_chars).to_list()
        else:
            return_val = rel_up_csv[upgrade_cols].fillna('').agg(lambda x: {v for v in x if v},
                                                                 axis=1).to_list()

        return return_val

    @validate_arguments(config=dict(arbitrary_types_allowed=True))
    def get_enduses_buildings_map_by_change(self, upgrade_id: Union[str, int],
                                            change_type: str = 'changed',
                                            bldg_list: Optional[list[int]] = None) -> dict[str, pd.Index]:
        """Finds the list of enduses and the buildings that had change in the enduses for a given change type.
        Args:
            upgrade (int | stsr): The upgrade to look at.
            change_type (str, optional): The kind of change to look for. Valid values are increased, decreased and
                                         and changed. Defaults to 'changed' which includes both cases.
            bldg_list (list[int], optional): The list of buildings to narrow down to. If omitted, searches through all
                                             all the buildings in the upgrade. Defaults to None.

        Returns:
            dict[str, pd.Index]: Dict mapping enduses that had a given change and building ids showing that change.
        """
        up_csv = self._bsq.get_upgrades_csv_full(upgrade_id=int(upgrade_id))
        bs_csv = self._bsq.get_results_csv_full()
        if bldg_list:
            up_csv = up_csv.loc[bldg_list]
            bs_csv = bs_csv.loc[bldg_list]

        def clean_column(col: str):
            col = col.removeprefix(self._bsq._out_prefix)
            col = col.removeprefix("end_use_")
            col = col.removeprefix("fuel_use_")
            return col

        def get_pure_enduse(col):
            for fuel in FUELS:
                col = col.removeprefix(f"{fuel}_")
            return col

        end_use_cols = [c for c in up_csv.columns if ('end_use' in c) or ('fuel_use' in c) or ('unmet_hours_' in c)]
        up_csv = up_csv[end_use_cols].rename(columns=clean_column)
        bs_csv = bs_csv[end_use_cols].rename(columns=clean_column)

        pure_enduses = {get_pure_enduse(c) for c in up_csv.columns}

        def get_all_fuel_enduses(df, end_use):
            return [col for col in df.columns if col.endswith(end_use)]

        def add_all_fuel_cols(df):
            for end_use in pure_enduses:
                df[f"all_fuel_{end_use}"] = df[get_all_fuel_enduses(df, end_use)].sum(axis=1)
            return df

        add_all_fuel_cols(up_csv)
        add_all_fuel_cols(bs_csv)

        diff = up_csv - bs_csv
        enduses_df = diff.transpose()
        if change_type == 'decreased':
            enduses_df = enduses_df < -1e-12
        elif change_type == 'increased':
            enduses_df = enduses_df > 1e-12
        else:
            enduses_df = enduses_df.abs() > 1e-12
        change_dict = enduses_df.apply(lambda x: enduses_df.columns[x], axis=1).to_dict()
        clean_dict = {key: value for key, value in change_dict.items() if len(value) > 0}
        return clean_dict

Methods

def check_options_integrity(self, yaml_file: str, opt_sat_path: str) ‑> bool

Checks the upgrade/option spec in the buildstock configuration file against what is actually in the simulation result and flags any discrepancy. The verificationa allows for some mismatch since some simulations could have failed. Unless there is a bug somewhere in buildstock workflow, integrity check should pass regardless of number of failures.

Args

yaml_file : str
The path to buildstock configuration file used to run the simulation

Returns

bool
Whether or not the integrity check passed.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def check_options_integrity(self, yaml_file: str, opt_sat_path: str) -> bool:
    """ Checks the upgrade/option spec in the buildstock configuration file against what is actually in the
    simulation result and flags any discrepancy. The verificationa allows for some mismatch since some simulations
    could have failed. Unless there is a bug somewhere in buildstock workflow, integrity check should pass
    regardless of number of failures.

    Args:
        yaml_file (str): The path to buildstock configuration file used to run the simulation

    Returns:
        bool: Whether or not the integrity check passed.
    """
    intg_df = self.get_option_integrity_report(yaml_file, opt_sat_path).reset_index()
    all_intg_df = intg_df[intg_df['option'] == 'All']
    blank_opt_upgrades = all_intg_df[all_intg_df['applied_buildings_count'] < all_intg_df['Upgrade Success']]
    assert (all_intg_df['applied_buildings_count'] >= all_intg_df['Upgrade Success']).all()
    if len(blank_opt_upgrades) > 0:
        print_r("BLANK OPTIONS: The following upgrades have fewere 'applied_buildings_count' than 'Upgrade Success'"
                "This indicates that some buildings in these upgrades didn't have any option applied")
    serious = False
    for indx, row in intg_df.iterrows():
        upgrade_failures = row['Upgrade Failures']
        applicable_count = row.applicable_buildings_count
        applied_count = row.applied_buildings_count
        unapplied_count = row.unapplied_bldgs_count
        overapplied_count = row.overapplied_bldgs_count
        if row.unapplied_bldgs_count > 0:
            if row.option == 'All' and row.unapplied_bldgs_count == upgrade_failures:
                print_r(
                    f"OPTION UNDERAPPLICATION: Upgrade {row.upgrade} was was supposed to be applied to "
                    f"{applicable_count} samples but applied to {applied_count} samples")
                print_g(f"This difference of {unapplied_count} exactly matches with {upgrade_failures}"
                        f" failures in Upgrade {row.upgrade}. It's all good.")
            else:
                print_r(f"OPTION UNDERAPPLICATION: Upgrade {row.upgrade}, {row.option} didn't apply to "
                        f"{unapplied_count} samples that it was supposed to apply to.")
                if upgrade_failures > 0:
                    if unapplied_count > upgrade_failures:
                        print_r(f"{upgrade_failures} failures in Upgrade {row.upgrade} can't account for this.")
                        serious = True
                    else:
                        print_g(f"{upgrade_failures} failures in Upgrade {row.upgrade} may account for this.")
                else:
                    serious = True

        if overapplied_count > 0:
            print_r(f"OPTION OVERAPPLICATION: Upgrade {row.upgrade}, {row.option} applied to"
                    f" {unapplied_count} samples that it was supposed to NOT apply to.")
            serious = True
    if not serious:
        print_g("Integrity check passed.")
        return True
    else:
        print_r("Integrity check failed. Please check the serious issues above.")
        return False
def check_ts_bs_integrity(self) ‑> bool

Checks the integrity between the timeseries and baseline (metadata) tables.

Returns

bool
Whether or not the integrity check passed.
Expand source code
def check_ts_bs_integrity(self) -> bool:
    """Checks the integrity between the timeseries and baseline (metadata) tables.

    Returns:
        bool: Whether or not the integrity check passed.
    """
    logger.info("Checking integrity with ts_tables ...")
    raw_ts_report = self._get_ts_report()
    raw_success_report = self.get_success_report(trim_missing_bs=False)
    if self._bsq.db_schema.structure.unapplicables_have_ts:
        bs_dict = raw_success_report[['unapplicable', 'success']].sum(axis=1).to_dict()
    else:
        bs_dict = raw_success_report['success'].to_dict()
    ts_dict = raw_ts_report.to_dict()['success']
    check_pass = True
    for upgrade, count in ts_dict.items():
        if count != bs_dict.get(upgrade, 0):
            print_r(f"Upgrade {upgrade} has {count} samples in timeseries table, but {bs_dict.get(upgrade, 0)}"
                    " samples in baseline/upgrade table.")
            check_pass = False
    if check_pass:
        print_g("Annual and timeseries tables are verified to have the same number of buildings.")
    try:
        rowcount = self._bsq._get_rows_per_building()
        print_g(f"All buildings are verified to have the same number of ({rowcount}) timeseries rows.")
    except ValueError:
        check_pass = False
        print_r("Different buildings have different number of timeseries rows.")
    return check_pass
def get_applied_options(self, upgrade_id: Union[str, int], bldg_ids: list[int], include_base_opt: bool = False)

Returns the list of options applied to each buildings for a given upgrade.

Args

upgrade_id (int | str): The upgrade for which to find the applied options.
bldg_ids : list[int]
List of building ids.
include_base_opt : bool, optional
If baseline value is to be included. Defaults to False.

Returns

list[set|dict]: List of options (along with baseline chars, if include_base_opt is true)

Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def get_applied_options(self, upgrade_id: Union[str, int], bldg_ids: list[int],
                        include_base_opt: bool = False):
    """Returns the list of options applied to each buildings for a given upgrade.

    Args:
        upgrade_id (int | str): The upgrade for which to find the applied options.
        bldg_ids (list[int]): List of building ids.
        include_base_opt (bool, optional): If baseline value is to be included. Defaults to False.

    Returns:
        list[set|dict]: List of options (along with baseline chars, if include_base_opt is true)
    """
    up_csv = self._bsq.get_upgrades_csv_full(upgrade_id=int(upgrade_id))
    rel_up_csv = up_csv.loc[bldg_ids]
    upgrade_cols = [key for key in up_csv.columns
                    if key.startswith("upgrade_costs.option_") and key.endswith("_name")]

    if include_base_opt:
        base_csv = self._bsq.get_results_csv_full()
        rel_base_csv = base_csv.loc[bldg_ids]
        rel_base_csv = rel_base_csv.rename(columns=lambda c: c.split('.')[1] if '.' in c else c)
        char_df = rel_up_csv[upgrade_cols].fillna('').agg(
            lambda x: {'_'.join(v.split('|')[0].lower().split()) for v in x if v}, axis=1)
        all_chars = [c for c in reduce(set.union, char_df.values) if c in set(rel_base_csv.columns)]
        char_dict: dict[Hashable, dict[str, str]] = rel_base_csv[all_chars].to_dict(orient='index')

        def add_base_chars(options: list):
            bldg_id = options[0]  # first entry is building_id
            return {opt: char_dict[bldg_id].get('_'.join(opt.split('|')[0].lower().split()), '')
                    for opt in options[1:]}
        opt_df: pd.Series = rel_up_csv[upgrade_cols].fillna('').reset_index().agg(lambda x: [v for v in x if v],
                                                                                  axis=1)
        return_val = opt_df.map(add_base_chars).to_list()
    else:
        return_val = rel_up_csv[upgrade_cols].fillna('').agg(lambda x: {v for v in x if v},
                                                             axis=1).to_list()

    return return_val
def get_buildings_by_change(self, *, upgrade_id: int, change_type: Literal['no-chng', 'bad-chng', 'ok-chng', 'true-bad-chng', 'true-ok-chng', 'null', 'any'] = 'no-chng', get_query_only: bool = False)
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_buildings_by_change(self, *, upgrade_id: int,
                            change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                                 "true-ok-chng", "null", "any"] = 'no-chng',
                            get_query_only: bool = False):

    if self._bsq.up_table is None:
        raise ValueError("No upgrade table is available .")
    up_query = sa.select([self._bsq.bs_bldgid_column, self._bsq._bs_completed_status_col,
                          self._bsq._up_completed_status_col])
    up_query = up_query.join(self._bsq.up_table, self._bsq.bs_bldgid_column == self._bsq.up_bldgid_column)

    conditions = self._get_change_conditions(change_type)
    up_query = up_query.where(sa.and_(self._bsq._bs_successful_condition,
                                      self._bsq._up_successful_condition,
                                      self._bsq.up_table.c['upgrade'] == str(upgrade_id),
                                      conditions))  # type: ignore
    if get_query_only:
        return self._bsq._compile(up_query)
    df = self._bsq.execute(up_query)
    return df[self._bsq.bs_bldgid_column.name].to_numpy(dtype='int32').tolist()
def get_enduses_buildings_map_by_change(self, upgrade_id: Union[str, int], change_type: str = 'changed', bldg_list: Optional[list[int]] = None) ‑> dict[str, pandas.core.indexes.base.Index]

Finds the list of enduses and the buildings that had change in the enduses for a given change type.

Args

upgrade (int | stsr): The upgrade to look at.
change_type : str, optional
The kind of change to look for. Valid values are increased, decreased and and changed. Defaults to 'changed' which includes both cases.
bldg_list : list[int], optional
The list of buildings to narrow down to. If omitted, searches through all all the buildings in the upgrade. Defaults to None.

Returns

dict[str, pd.Index]
Dict mapping enduses that had a given change and building ids showing that change.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True))
def get_enduses_buildings_map_by_change(self, upgrade_id: Union[str, int],
                                        change_type: str = 'changed',
                                        bldg_list: Optional[list[int]] = None) -> dict[str, pd.Index]:
    """Finds the list of enduses and the buildings that had change in the enduses for a given change type.
    Args:
        upgrade (int | stsr): The upgrade to look at.
        change_type (str, optional): The kind of change to look for. Valid values are increased, decreased and
                                     and changed. Defaults to 'changed' which includes both cases.
        bldg_list (list[int], optional): The list of buildings to narrow down to. If omitted, searches through all
                                         all the buildings in the upgrade. Defaults to None.

    Returns:
        dict[str, pd.Index]: Dict mapping enduses that had a given change and building ids showing that change.
    """
    up_csv = self._bsq.get_upgrades_csv_full(upgrade_id=int(upgrade_id))
    bs_csv = self._bsq.get_results_csv_full()
    if bldg_list:
        up_csv = up_csv.loc[bldg_list]
        bs_csv = bs_csv.loc[bldg_list]

    def clean_column(col: str):
        col = col.removeprefix(self._bsq._out_prefix)
        col = col.removeprefix("end_use_")
        col = col.removeprefix("fuel_use_")
        return col

    def get_pure_enduse(col):
        for fuel in FUELS:
            col = col.removeprefix(f"{fuel}_")
        return col

    end_use_cols = [c for c in up_csv.columns if ('end_use' in c) or ('fuel_use' in c) or ('unmet_hours_' in c)]
    up_csv = up_csv[end_use_cols].rename(columns=clean_column)
    bs_csv = bs_csv[end_use_cols].rename(columns=clean_column)

    pure_enduses = {get_pure_enduse(c) for c in up_csv.columns}

    def get_all_fuel_enduses(df, end_use):
        return [col for col in df.columns if col.endswith(end_use)]

    def add_all_fuel_cols(df):
        for end_use in pure_enduses:
            df[f"all_fuel_{end_use}"] = df[get_all_fuel_enduses(df, end_use)].sum(axis=1)
        return df

    add_all_fuel_cols(up_csv)
    add_all_fuel_cols(bs_csv)

    diff = up_csv - bs_csv
    enduses_df = diff.transpose()
    if change_type == 'decreased':
        enduses_df = enduses_df < -1e-12
    elif change_type == 'increased':
        enduses_df = enduses_df > 1e-12
    else:
        enduses_df = enduses_df.abs() > 1e-12
    change_dict = enduses_df.apply(lambda x: enduses_df.columns[x], axis=1).to_dict()
    clean_dict = {key: value for key, value in change_dict.items() if len(value) > 0}
    return clean_dict
def get_option_integrity_report(self, yaml_file: str, opt_sat_path: str) ‑> pandas.core.frame.DataFrame

Checks the upgrade/option spec in the buildstock configuration file against what is actually in the simulation result and tabulates the discrepancy.

Args

yaml_file : str
The path to buildstock configuration file used to run the simulation

Returns

pd.DataFrame
The report dataframe.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_option_integrity_report(self, yaml_file: str, opt_sat_path: str) -> pd.DataFrame:
    """Checks the upgrade/option spec in the buildstock configuration file against what is actually in the
    simulation result and tabulates the discrepancy.
    Args:
        yaml_file (str): The path to buildstock configuration file used to run the simulation

    Returns:
        pd.DataFrame: The report dataframe.
    """
    ua_df = self._bsq.get_upgrades_analyzer(yaml_file, opt_sat_path).get_report()
    ua_df = ua_df.groupby(['upgrade', 'option']).aggregate({'applicable_to': 'sum',
                                                            'applicable_buildings': lambda x: reduce(set.union, x)})
    assert (ua_df['applicable_to'] == ua_df['applicable_buildings'].map(lambda x: len(x))).all()
    opt_report_df = self.get_options_report().fillna(0)
    opt_report_df = opt_report_df.set_index(['upgrade', 'option'])
    diff_df = pd.DataFrame(index=ua_df.index)
    diff_df['applicable_buildings'] = ua_df['applicable_buildings']
    diff_df['applied_buildings'] = opt_report_df['applied_buildings']
    diff_df['overapplied_bldgs'] = opt_report_df['applied_buildings'] - ua_df['applicable_buildings']
    diff_df['unapplied_bldgs'] = ua_df['applicable_buildings'] - opt_report_df['applied_buildings']
    for col in diff_df.columns:
        diff_df[f"{col}_count"] = diff_df[col].map(lambda x: len(x) if isinstance(x, set) else 0)
    success_report_df = self.get_success_report()
    fail_report = success_report_df[['fail', 'success']].rename(columns={'fail': "Upgrade Failures",
                                                                         'success': "Upgrade Success"})
    diff_df = diff_df.join(fail_report)
    return diff_df
def get_options_report(self, trim_missing_bs: bool = True) ‑> pandas.core.frame.DataFrame

Finds out the number and list of buildings each of the options applied to.

Args

trim_missing_bs : bool, optional
Whether the buildings that are not available in basline should be dropped. Defaults to True.

Returns

pd.DataFrame
The list of options the corresponding set of building ids the option applied to.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_options_report(self, trim_missing_bs: bool = True) -> pd.DataFrame:
    """Finds out the number and list of buildings each of the options applied to.

    Args:
        trim_missing_bs (bool, optional): Whether the buildings that are not available in basline should be dropped.
                                          Defaults to True.
    Returns:
        pd.DataFrame: The list of options the corresponding set of building ids the option applied to.
    """
    if self._bsq.up_table is None:
        raise ValueError("No upgrade table is available .")

    full_report = self._get_full_options_report(trim_missing_bs=trim_missing_bs)
    option_cols = [c for c in full_report.columns if c.startswith("option")]
    total_counts: Counter = Counter()
    bldg_array: dict = defaultdict(list)
    for option in option_cols:
        grouped_dict = full_report.groupby(['upgrade', option]).aggregate({'success': 'sum',
                                                                           'applied_buildings': 'sum'}).to_dict()
        total_counts += Counter(grouped_dict['success'])
        for key, val in grouped_dict['applied_buildings'].items():
            bldg_array[key] += val

    option_df = pd.DataFrame.from_dict({'success': total_counts,
                                        'applied_buildings': bldg_array,
                                        }, orient='columns')
    option_df['applied_buildings'] = option_df['applied_buildings'].map(lambda x: set(x))
    option_df = option_df.reset_index()
    option_df.columns = ['upgrade', 'option', 'success', 'applied_buildings']
    # Aggregate for upgrade
    agg = option_df.groupby('upgrade').aggregate({'applied_buildings': lambda x: reduce(set.union, x)})
    agg = agg.reset_index()
    agg.insert(1, 'success', agg['applied_buildings'].map(lambda x: len(x)))
    agg.insert(0, 'option', 'All')
    full_df = pd.concat([option_df, agg])
    full_df = full_df.sort_values(['upgrade', 'option'])
    return full_df
def get_success_report(self, trim_missing_bs: Union[Literal['auto'], bool] = 'auto', get_query_only: bool = False)

Returns a basic report showing number of success and failures for each upgrade along with percentage. Additional information regarding number of buildings to which the upgrade applied and whether the enduses changed is also returned.

Args

trim_missing_bs (str | bool, optional): Whether the buildings that failed in baseline should be
dropped from the upgrades. If true, all metrics is calculated after
those buildings are dropped from the upgrades. Defaults to 'auto'.
get_query_only : bool, optional
If true, returns SQL query instead of the report. Defaults to False.

Raises

ValueError
If something went wrong.

Returns

pd.DateFrame
The report dataframe. The meaning of the various columns are as follows: Fail: Number of simulation that failed.
unapplicable: Number of buildings to which the upgrade didn't apply (because of apply logic)
Success: The number of buildings which completed simulation successfully. No simulation is run for unapplicable buildings.
Sum: Sum of the first three columns.
Applied %: Success / Sum * 100 %.
no-chng : Number of successful simulation that didn't have any change in values for any enduses.
bad-chng: Number of successful simulation that had bad changes. It's considered a bad change if none of the fuel has any reduction in energy consumption, and at least one fuel has an increase in energy consumption.
ok-chng: |set(success) - set(no-chng) - set(bad-chng)| i.e. count of successful simulation that are neither no-chng nor bad-chng.
true-bad-chng: Count of only those bad changes in which neither of the umnet cooling/heating hours decreased. In other words, the increase in energy consumption in one of the fuel type (often electricity - for electrification upgrades) didn't result in improvement of cooling/heating umnet hours.
true-ok-chng: Adjustment of ok-chng after using true-bad-chng instead of bad-chng
null: Included for testing/integrity-checking purpose. It refers to number of buildings that are
are neither no-chng, not bad-chng nor ok-chng. It should always be zero.
any: Sum of the no-chng + bad-chng + ok-chng. Refers to any change (including no-change).
x-chng %: The percentage form of the change calculated by using success count as the base.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_success_report(self, trim_missing_bs: Union[Literal['auto'], bool] = 'auto',
                       get_query_only: bool = False):
    """Returns a basic report showing number of success and failures for each upgrade along with percentage.
    Additional information regarding number of buildings to which the upgrade applied and whether the enduses
    changed is also returned.

    Args:
        trim_missing_bs (str | bool, optional): Whether the buildings that failed in baseline should be
                                                dropped from the upgrades. If true, all metrics is calculated after
                                                those buildings are dropped from the upgrades. Defaults to 'auto'.
        get_query_only (bool, optional): If true, returns SQL query instead of the report. Defaults to False.

    Raises:
        ValueError: If something went wrong.

    Returns:
        pd.DateFrame: The report dataframe. The meaning of the various columns are as follows:
            **Fail**: Number of simulation that failed.  
            **unapplicable**: Number of buildings to which the upgrade didn't apply (because of apply logic)  
            **Success**: The number of buildings which completed simulation successfully. No simulation is run for
                unapplicable buildings.  
            **Sum**: Sum of the first three columns.  
            **Applied %**: Success / Sum * 100 %.  
            **no-chng** : Number of successful simulation that didn't have any change in values for any enduses.  
            **bad-chng:** Number of successful simulation that had bad changes. It's considered a bad change if
               none of the fuel has any reduction in energy consumption, and at least one fuel has an
               increase in energy consumption.  
            **ok-chng:** |set(success) - set(no-chng) - set(bad-chng)| i.e. count of successful simulation that are
                     neither no-chng nor bad-chng.  
            **true-bad-chng:** Count of only those bad changes in which neither of the umnet cooling/heating hours
                           decreased. In other words, the increase in energy consumption in one of the fuel type
                           (often electricity - for electrification upgrades) didn't result in improvement of
                           cooling/heating umnet hours.  
            **true-ok-chng:** Adjustment of ok-chng after using true-bad-chng instead of bad-chng  
            **null**: Included for testing/integrity-checking purpose. It refers to number of buildings that are  
                  are neither no-chng, not bad-chng nor ok-chng. It should always be zero.  
            **any**: Sum of the no-chng + bad-chng + ok-chng. Refers to any change (including no-change).  
            **x-chng %**: The percentage form of the change calculated by using success count as the base.  
    """  # noqa: W291

    baseline_result = self._get_bs_success_report()

    if self._bsq.up_table is None:
        return baseline_result

    if trim_missing_bs == 'auto':
        if 'success' in baseline_result:
            trim = True
        else:
            logger.warning("None of the simulation was successful in baseline. The counts for upgrade will be"
                           " returned without requiring corresponding successful baseline run.")
            trim = False
    elif isinstance(trim_missing_bs, bool):
        trim = trim_missing_bs
    else:
        assert_never(trim_missing_bs)
        raise ValueError("trim_missing_bs must be either True/False or 'auto'.")

    if get_query_only:
        baseline_query = self._get_bs_success_report(get_query_only=True)
        upgrade_query = self._get_up_success_report(trim_missing_bs=trim, get_query_only=True)
        change_query = self._get_change_report(get_query_only=True)
        return baseline_query, upgrade_query, change_query

    upgrade_result = self._get_up_success_report(trim_missing_bs=trim).fillna(0)
    change_result = self._get_change_report().fillna(0)
    if get_query_only:
        return baseline_result, upgrade_result, change_result
    if 'success' in upgrade_result.columns:
        pa = round(100 * (upgrade_result['fail'] + upgrade_result['success']) /
                   upgrade_result['Sum'], 1)
        upgrade_result['Applied %'] = pa

    pf = pd.concat([baseline_result, upgrade_result])
    pf = pf.join(change_result).fillna(0)
    pf['no-chng %'] = round(100 * pf['no-chng'] / pf['success'], 1)
    pf['bad-chng %'] = round(100 * pf['bad-chng'] / pf['success'], 1)
    pf['ok-chng %'] = round(100 * pf['ok-chng'] / pf['success'], 1)
    pf['true-ok-chng %'] = round(100 * pf['true-ok-chng'] / pf['success'], 1)
    pf['true-bad-chng %'] = round(100 * pf['true-bad-chng'] / pf['success'], 1)
    return pf
def get_successful_simulation_count(self, *, restrict: Sequence[tuple[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn], Union[str, int, Sequence[Union[int, str]]]]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), get_query_only: bool = False)

Returns the count of successful simulation for the given restric condition in the baseline.

Args

restrict
The list of where condition to restrict the results to. It should be specified as a list of tuple. Example: [('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]
get_query_only
If set to true, returns the list of queries to run instead of the result.

Returns

Pandas integer counting the number of successful simulation

Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_successful_simulation_count(self, *,
                                    restrict: Sequence[tuple[AnyColType,
                                                             Union[str, int, Sequence[Union[int, str]]]]] =
                                    Field(default_factory=list),
                                    get_query_only: bool = False):
    """
    Returns the count of successful simulation for the given restric condition in the baseline.
    Args:
        restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple.
                  Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
        get_query_only: If set to true, returns the list of queries to run instead of the result.

    Returns:
        Pandas integer counting the number of successful simulation
    """
    query = sa.select(safunc.count().label("count"))

    restrict = list(restrict) if restrict else []
    restrict.insert(0, (self._bsq.db_schema.column_names.completed_status,
                        [self._bsq.db_schema.completion_values.success]))
    query = self._bsq._add_restrict(query, restrict, bs_only=True)
    if get_query_only:
        return self._bsq._compile(query)

    return self._bsq.execute(query)
def print_change_details(self, upgrade_id: int, yml_file: str, opt_sat_path: str, change_type: Literal['no-chng', 'bad-chng', 'ok-chng', 'true-bad-chng', 'true-ok-chng', 'null', 'any'] = 'no-chng')
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def print_change_details(self, upgrade_id: int, yml_file: str, opt_sat_path: str,
                         change_type: Literal["no-chng", "bad-chng", "ok-chng", "true-bad-chng",
                                              "true-ok-chng", "null", "any"] = 'no-chng'):
    ua = self._bsq.get_upgrades_analyzer(yml_file, opt_sat_path)
    bad_bids = self.get_buildings_by_change(upgrade_id=upgrade_id, change_type=change_type)
    good_bids = self.get_buildings_by_change(upgrade_id=upgrade_id, change_type='ok-chng')
    ua.print_unique_characteristic(upgrade_id, change_type, good_bids, bad_bids)