Package buildstock_query

BuildStockQuery


A library to run AWS Athena queries to get various data from a BuildStock run. The main class is called BuildStockQuery. An object of BuildStockQuery needs to be created to perform various queries. In addition to supporting various query member functions, the BuildStockQuery object contains 4 member objects that can be used to perform certain class of queries and analysis. These 4 member objects can be accessed as follows::

bsq = BuildStockQuery(…) BuildStockQuery object
bsq.agg BuildStockAggregate
bsq.report BuildStockReport
bsq.savings BuildStockSavings
bsq.utility BuildStockUtility

# Some basic query can be done directly using the BuildStockQuery object. For example:
from buildstock_query import BuildStockQuery 
bsq = BuildStockQuery(...)
bsq.get_results_csv()
bsq.get_upgrades_csv()

# Other more specific queries can be done using specific query class objects. For example:
bsq.agg.aggregate_annual(...)
bsq.agg.aggregate_timeseries(...)
...
bsq.report.get_success_report(...)
bsq.report.get_successful_simulation_count(...)
...
bsq.savings.savings_shape(...)
...
bsq.utility.aggregate_annual_by_eiaid(...)

In addition, the library also exposes UpgradesAnalyzer. It can be used to perform quality check for the apply logic in buildstock configuration file.

from buildstock_query import UpgradesAnalyzer
ua = UpgradesAnalyzer(yaml_file='my_buildstock_configuration.yml', 'my_buildstock.csv')
options_report = ua.get_report()
options_report.drop(columns=['applicable_buildings']).to_csv('options_report.csv')
ua.save_detailed_report('detailed_report.csv')

UpgradesAnalyzer is also exposed as an script and can be directly used from the command line by simply calling it (from the env buildstock_query is installed in):

>>>upgrades_analyzer
Welcome to upgrades analyzer
...

There is also another experimental tool called buildstock_query.tools.upgrades_visualizer available from command line. The tool starts a localhost poltly dash dashboard that can be used for analytic visualization of annual results for different upgrades.

>>>upgrades_visualizer
Welcome to upgrades visualizer
...
Expand source code
"""
# BuildStockQuery
- - - - - - - - -
A library to run AWS Athena queries to get various data from a BuildStock run. The main class is called BuildStockQuery.
An object of BuildStockQuery needs to be created to perform various queries. In addition to supporting various
query member functions, the BuildStockQuery object contains 4 member objects that can be used to perform certain
class of queries and analysis. These 4 member objects can be accessed as follows::

bsq = BuildStockQuery(...)  `BuildStockQuery` object  
bsq.agg  `buildstock_query.aggregate_query.BuildStockAggregate`  
bsq.report  `buildstock_query.report_query.BuildStockReport`  
bsq.savings  `buildstock_query.savings_query.BuildStockSavings`  
bsq.utility  `buildstock_query.utility_query.BuildStockUtility`  

```
# Some basic query can be done directly using the BuildStockQuery object. For example:
from buildstock_query import BuildStockQuery 
bsq = BuildStockQuery(...)
bsq.get_results_csv()
bsq.get_upgrades_csv()

# Other more specific queries can be done using specific query class objects. For example:
bsq.agg.aggregate_annual(...)
bsq.agg.aggregate_timeseries(...)
...
bsq.report.get_success_report(...)
bsq.report.get_successful_simulation_count(...)
...
bsq.savings.savings_shape(...)
...
bsq.utility.aggregate_annual_by_eiaid(...)
```

In addition, the library also exposes `buildstock_query.tools.upgrades_analyzer.UpgradesAnalyzer`. It can be used to
perform quality check for the apply logic in buildstock configuration file.
```
from buildstock_query import UpgradesAnalyzer
ua = UpgradesAnalyzer(yaml_file='my_buildstock_configuration.yml', 'my_buildstock.csv')
options_report = ua.get_report()
options_report.drop(columns=['applicable_buildings']).to_csv('options_report.csv')
ua.save_detailed_report('detailed_report.csv')
```

`buildstock_query.tools.upgrades_analyzer.UpgradesAnalyzer` is also exposed as an script and can be directly used
from the command line by simply calling it (from the env buildstock_query is installed in):
```
>>>upgrades_analyzer
Welcome to upgrades analyzer
...
```

There is also another experimental tool called `buildstock_query.tools.upgrades_visualizer` available from command line.
The tool starts a localhost poltly dash dashboard that can be used for analytic visualization of annual results for
different upgrades.
```
>>>upgrades_visualizer
Welcome to upgrades visualizer
...
```
"""  # noqa: W291
from buildstock_query.schema.utilities import MappedColumn
from buildstock_query.query_core import ExeId
from buildstock_query.main import BuildStockQuery
from buildstock_query.tools import UpgradesAnalyzer
from buildstock_query.helpers import KWH2MBTU
from buildstock_query.helpers import MBTU2KWH
__all__ = ['BuildStockQuery', 'UpgradesAnalyzer', 'KWH2MBTU', 'MBTU2KWH', 'ExeId', 'MappedColumn']

Sub-modules

buildstock_query.aggregate_query
buildstock_query.db_schema
buildstock_query.helpers
buildstock_query.main
buildstock_query.query_core
buildstock_query.report_query
buildstock_query.savings_query
buildstock_query.schema
buildstock_query.tools
buildstock_query.utility_query

Classes

class BuildStockQuery (workgroup: str, db_name: str, table_name: Union[str, tuple[str, Optional[str], Optional[str]]], db_schema: Optional[str] = None, buildstock_type: Literal['resstock', 'comstock'] = 'resstock', sample_weight: Union[int, float, ForwardRef(None)] = None, region_name: str = 'us-west-2', execution_history: Optional[str] = None, skip_reports: bool = False, athena_query_reuse: bool = True)

A class to run Athena queries for BuildStock runs and download results as pandas DataFrame.

Args

workgroup : str
The workgroup for athena. The cost will be charged based on workgroup.
db_name : str
The athena database name
buildstock_type : str, optional
'resstock' or 'comstock' runs. Defaults to 'resstock'
table_name : str or Union[str, tuple[str, Optional[str], Optional[str]]]
If a single string is provided,
say, 'mfm_run', then it must correspond to tables in athena named mfm_run_baseline and optionally
mfm_run_timeseries and mf_run_upgrades. Or, tuple of three elements can be privided for the table names
for baseline, timeseries and upgrade. Timeseries and upgrade can be None if no such table exist.
db_schema : str, optional
The database structure in Athena is different between ResStock and ComStock run. It is also different between the version in OEDI and default version from BuildStockBatch. This argument controls the assumed schema. Allowed values are 'resstock_default', 'resstock_oedi', 'comstock_default' and 'comstock_oedi'. Defaults to 'resstock_default' for resstock and 'comstock_default' for comstock.
sample_weight : str, optional
Specify a custom sample_weight. Otherwise, the default is 1 for ComStock and uses sample_weight in the run for ResStock.
region_name : str, optional
the AWS region where the database exists. Defaults to 'us-west-2'.
execution_history : str, optional
A temporary file to record which execution is run by the user, to help stop them. Will use .execution_history if not supplied. Generally, not required to supply a custom filename.
skip_reports : bool, optional
If true, skips report printing during initialization. If False (default), prints report from BuildStockReport.get_success_report().
athena_query_reuse : bool, optional
When true, Athena will make use of its built-in 7 day query cache. When false, it will not. Defaults to True. One use case to set this to False is when you have modified the underlying s3 data or glue schema and want to make sure you are not using the cached results.
Expand source code
class BuildStockQuery(QueryCore):

    @validate_arguments(config=dict(smart_union=True))
    def __init__(self,
                 workgroup: str,
                 db_name: str,
                 table_name: Union[str, tuple[str, Optional[str], Optional[str]]],
                 db_schema: Optional[str] = None,
                 buildstock_type: Literal['resstock', 'comstock'] = 'resstock',
                 sample_weight: Optional[Union[int, float]] = None,
                 region_name: str = 'us-west-2',
                 execution_history: Optional[str] = None,
                 skip_reports: bool = False,
                 athena_query_reuse: bool = True,
                 ) -> None:
        """A class to run Athena queries for BuildStock runs and download results as pandas DataFrame.

        Args:
            workgroup (str): The workgroup for athena. The cost will be charged based on workgroup.
            db_name (str): The athena database name
            buildstock_type (str, optional): 'resstock' or 'comstock' runs. Defaults to 'resstock'
            table_name (str or Union[str, tuple[str, Optional[str], Optional[str]]]): If a single string is provided,
            say, 'mfm_run', then it must correspond to tables in athena named mfm_run_baseline and optionally
            mfm_run_timeseries and mf_run_upgrades. Or, tuple of three elements can be privided for the table names
            for baseline, timeseries and upgrade. Timeseries and upgrade can be None if no such table exist.
            db_schema (str, optional): The database structure in Athena is different between ResStock and ComStock run.
                It is also different between the version in OEDI and default version from BuildStockBatch. This argument
                controls the assumed schema. Allowed values are 'resstock_default', 'resstock_oedi', 'comstock_default'
                and 'comstock_oedi'. Defaults to 'resstock_default' for resstock and 'comstock_default' for comstock.
            sample_weight (str, optional): Specify a custom sample_weight. Otherwise, the default is 1 for ComStock and
                uses sample_weight in the run for ResStock.
            region_name (str, optional): the AWS region where the database exists. Defaults to 'us-west-2'.
            execution_history (str, optional): A temporary file to record which execution is run by the user,
                to help stop them. Will use .execution_history if not supplied. Generally, not required to supply a
                custom filename.
            skip_reports (bool, optional): If true, skips report printing during initialization. If False (default),
                prints report from `buildstock_query.report_query.BuildStockReport.get_success_report`.
            athena_query_reuse (bool, optional): When true, Athena will make use of its built-in 7 day query cache.
                When false, it will not. Defaults to True. One use case to set this to False is when you have modified
                the underlying s3 data or glue schema and want to make sure you are not using the cached results.
        """
        db_schema = db_schema or f"{buildstock_type}_default"
        self.params = BSQParams(
            workgroup=workgroup,
            db_name=db_name,
            buildstock_type=buildstock_type,
            table_name=table_name,
            db_schema=db_schema,
            sample_weight_override=sample_weight,
            region_name=region_name,
            execution_history=execution_history,
            athena_query_reuse=athena_query_reuse
        )
        self._run_params = self.params.get_run_params()
        from buildstock_query.report_query import BuildStockReport
        from buildstock_query.aggregate_query import BuildStockAggregate
        from buildstock_query.savings_query import BuildStockSavings
        from buildstock_query.utility_query import BuildStockUtility

        super().__init__(params=self._run_params)
        #: `buildstock_query.report_query.BuildStockReport` object to perform report queries
        self.report: BuildStockReport = BuildStockReport(self)
        #: `buildstock_query.aggregate_query.BuildStockAggregate` object to perform aggregate queries
        self.agg: BuildStockAggregate = BuildStockAggregate(self)
        #: `buildstock_query.savings_query.BuildStockSavings` object to perform savings queries
        self.savings = BuildStockSavings(self)
        #: `buildstock_query.utility_query.BuildStockUtility` object to perform utility queries
        self.utility = BuildStockUtility(self)

        self._char_prefix = self.db_schema.column_prefix.characteristics
        self._out_prefix = self.db_schema.column_prefix.output

        if not skip_reports:
            logger.info("Getting Success counts...")
            print(self.report.get_success_report())
            if self.ts_table is not None:
                self.report.check_ts_bs_integrity()
            self.save_cache()

    def get_buildstock_df(self) -> pd.DataFrame:
        """Returns the building characteristics data by quering Athena tables using the same format as that produced
        by the sampler and written as buildstock.csv. It only includes buildings with successful simulation.
        Returns:
            pd.DataFrame: The buildstock.csv dataframe.
        """
        results_df = self.get_results_csv_full()
        results_df = results_df[results_df[self.db_schema.column_names.completed_status].astype(str).str.lower() ==
                                self.db_schema.completion_values.success.lower()]
        buildstock_cols = [c for c in results_df.columns if c.startswith(self._char_prefix)]
        buildstock_df = results_df[buildstock_cols]
        buildstock_cols = [''.join(c.split(".")[1:]).replace("_", " ") for c in buildstock_df.columns
                           if c.startswith(self._char_prefix)]
        buildstock_df.columns = buildstock_cols
        return buildstock_df

    @validate_arguments
    def get_upgrades_analyzer(self, yaml_file: str, opt_sat_file: str) -> UpgradesAnalyzer:
        """
            Returns the UpgradesAnalyzer object with buildstock.csv downloaded from athena (see get_buildstock_df help)

        Args:
            yaml_file (str): The path to the buildstock configuration file.
            opt_sat_file (str): The path to the opt_saturation.csv file for the housing characteristics.

        Returns:
            UpgradesAnalyzer: returns UpgradesAnalyzer object. See UpgradesAnalyzer.
        """

        buildstock_df = self.get_buildstock_df()
        ua = UpgradesAnalyzer(buildstock=buildstock_df, yaml_file=yaml_file, opt_sat_file=opt_sat_file)
        return ua

    @typing.overload
    def _get_rows_per_building(self, get_query_only: Literal[False] = False) -> int:
        ...

    @typing.overload
    def _get_rows_per_building(self, get_query_only: Literal[True]) -> str:
        ...

    @validate_arguments
    def _get_rows_per_building(self, get_query_only: bool = False) -> Union[int, str]:
        select_cols = []
        if self.up_table is not None and self.ts_table is not None:
            select_cols.append(self.ts_table.c['upgrade'])
        select_cols.extend((self.ts_bldgid_column, safunc.count().label("row_count")))
        ts_query = sa.select(select_cols)
        if self.up_table is not None:
            ts_query = ts_query.group_by(sa.text('1'), sa.text('2'))
        else:
            ts_query = ts_query.group_by(sa.text('1'))

        if get_query_only:
            return self._compile(ts_query)
        df = self.execute(ts_query)
        if (df['row_count'] == df['row_count'][0]).all():  # verify all buildings got same number of rows
            return df['row_count'][0]
        else:
            raise ValueError("Not all buildings have same number of rows.")

    @validate_arguments(config=dict(smart_union=True))
    def get_distinct_vals(self, column: str, table_name: Optional[str],
                          get_query_only: bool = False) -> Union[str, pd.Series]:
        """
            Find distinct vals.
        Args:
            column (str): The column in the table for which distinct vals is needed.
            table_name (str, optional): The table in athena. Defaults to baseline table.
            get_query_only (bool, optional): If true, only returns the SQL query. Defaults to False.

        Returns:
            pd.Series: The distinct vals.
        """
        table_name = self.bs_table.name if table_name is None else table_name
        tbl = self._get_table(table_name)
        query = sa.select(tbl.c[column]).distinct()
        if get_query_only:
            return self._compile(query)

        r = self.execute(query, run_async=False)
        return r[column]

    @validate_arguments(config=dict(smart_union=True))
    def get_distinct_count(self, column: str, table_name: Optional[str] = None,
                           get_query_only: bool = False) -> Union[pd.DataFrame, str]:
        """
            Find distinct counts.
        Args:
            column (str): The column in the table for which distinct counts is needed.
            table_name (str, optional): The table in athena. Defaults to baseline table.
            get_query_only (bool, optional): If true, only returns the SQL query. Defaults to False.

        Returns:
            pd.Series: The distinct counts.
        """
        tbl = self.bs_table if table_name is None else self._get_table(table_name)
        query = sa.select([tbl.c[column], safunc.sum(1).label("sample_count"),
                           safunc.sum(self.sample_wt).label("weighted_count")])
        query = query.group_by(tbl.c[column]).order_by(tbl.c[column])
        if get_query_only:
            return self._compile(query)

        r = self.execute(query, run_async=False)
        return r

    @typing.overload
    def get_results_csv(self, *,
                        restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                            default_factory=list),
                        get_query_only: Literal[False] = False) -> pd.DataFrame:
        ...

    @typing.overload
    def get_results_csv(self, *,
                        get_query_only: Literal[True],
                        restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                            default_factory=list),
                        ) -> str:
        ...

    @typing.overload
    def get_results_csv(self, *,
                        get_query_only: bool,
                        restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                            default_factory=list),
                        ) -> Union[str, pd.DataFrame]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_results_csv(self,
                        restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                            default_factory=list),
                        get_query_only: bool = False) -> Union[pd.DataFrame, str]:
        """
        Returns the results_csv table for the BuildStock run
        Args:
            restrict (List[Tuple[str, Union[List, str, int]]], optional): The list of where condition to restrict the
                results to. It should be specified as a list of tuple.
                      Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
            get_query_only (bool): If set to true, returns the list of queries to run instead of the result.

        Returns:
            Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities
        """
        restrict = list(restrict) if restrict else []
        query = sa.select(['*']).select_from(self.bs_table)
        query = self._add_restrict(query, restrict, bs_only=True)
        compiled_query = self._compile(query)
        if get_query_only:
            return compiled_query
        self._session_queries.add(compiled_query)
        if compiled_query in self._query_cache:
            return self._query_cache[compiled_query].copy().set_index(self.bs_bldgid_column.name)
        logger.info("Making results_csv query ...")
        result = self.execute(query)
        return result.set_index(self.bs_bldgid_column.name)

    def _download_results_csv(self) -> str:
        """Downloads the results csv from s3 and returns the path to the downloaded file.
        Returns:
            str: The path to the downloaded file.
        """
        local_copy_path = self.cache_folder / f"{self.db_name}_{self.bs_table.name}.parquet"
        if os.path.exists(local_copy_path):
            return local_copy_path

        if isinstance(self.table_name, str):
            db_table_name = f'{self.table_name}{self.db_schema.table_suffix.baseline}'
        else:
            db_table_name = self.table_name[0]
        baseline_path = self._aws_glue.get_table(DatabaseName=self.db_name,
                                                 Name=db_table_name)['Table']['StorageDescriptor']['Location']
        bucket = baseline_path.split('/')[2]
        key = '/'.join(baseline_path.split('/')[3:])
        s3_data = self._aws_s3.list_objects(Bucket=bucket, Prefix=key)

        if 'Contents' not in s3_data:
            raise ValueError(f"Results parquet not found in s3 at {baseline_path}")
        matching_files = [path['Key'] for path in s3_data['Contents']
                          if "up00.parquet" in path['Key'] or 'baseline' in path['Key']]

        if len(matching_files) > 1:
            raise ValueError(f"Multiple results parquet found in s3 at {baseline_path} for baseline."
                             f"These files matched: {matching_files}")
        if len(matching_files) == 0:
            raise ValueError(f"No results parquet found in s3 at {baseline_path} for baseline."
                             f"Here are the files: {[content[0]['Key'] for content in s3_data['Contents']]}")

        self._aws_s3.download_file(bucket, matching_files[0], local_copy_path)
        return local_copy_path

    def get_results_csv_full(self) -> pd.DataFrame:
        """Returns the full results csv table. This is the same as get_results_csv without any restrictions. It uses
        the stored parquet files in s3 to download the results which is faster than querying athena.
        Returns:
            pd.DataFrame: The full results csv.
        """
        local_copy_path = self._download_results_csv()
        df = pd.read_parquet(local_copy_path)
        if df.index.name != self.bs_bldgid_column.name:
            df = df.set_index(self.bs_bldgid_column.name)
        return df

    @typing.overload
    def get_upgrades_csv(self, *, get_query_only: Literal[False] = False, upgrade_id: Union[int, str] = '0',
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
        default_factory=list)
    ) -> pd.DataFrame:
        ...

    @typing.overload
    def get_upgrades_csv(self, *, get_query_only: Literal[True], upgrade_id: Union[int, str] = '0',
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
        default_factory=list)
    ) -> str:
        ...

    @typing.overload
    def get_upgrades_csv(self, *, get_query_only: bool, upgrade_id: Union[int, str] = '0',
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
        default_factory=list)
    ) -> Union[pd.DataFrame, str]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_upgrades_csv(self, *, upgrade_id: Union[str, int] = '0',
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
            default_factory=list),
            get_query_only: bool = False) -> Union[pd.DataFrame, str]:
        """
        Returns the results_csv table for the BuildStock run for an upgrade.
        Args:
            restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple.
                      Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
            get_query_only: If set to true, returns the list of queries to run instead of the result.

        Returns:
            Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities
        """
        restrict = list(restrict) if restrict else []
        query = sa.select(['*']).select_from(self.up_table)
        if upgrade_id:
            if self.up_table is None:
                raise ValueError("This run has no upgrades")
            query = query.where(self.up_table.c['upgrade'] == str(upgrade_id))

        query = self._add_restrict(query, restrict, bs_only=True)
        compiled_query = self._compile(query)
        if get_query_only:
            return compiled_query
        self._session_queries.add(compiled_query)
        if compiled_query in self._query_cache:
            return self._query_cache[compiled_query].copy().set_index(self.bs_bldgid_column.name)
        logger.info("Making results_csv query for upgrade ...")
        return self.execute(query).set_index(self.bs_bldgid_column.name)

    def _download_upgrades_csv(self, upgrade_id: int) -> str:
        """ Downloads the upgrades csv from s3 and returns the path to the downloaded file.
        """
        if self.up_table is None:
            raise ValueError("This run has no upgrades")

        available_upgrades = list(self.get_available_upgrades())
        available_upgrades.remove('0')
        if str(upgrade_id) not in available_upgrades:
            raise ValueError(f"Upgrade {upgrade_id} not found")

        local_copy_path = self.cache_folder / f"{self.db_name}_{self.up_table.name}_{upgrade_id}.parquet"
        if os.path.exists(local_copy_path):
            return local_copy_path

        if isinstance(self.table_name, str):
            db_table_name = f'{self.table_name}{self.db_schema.table_suffix.upgrades}'
        else:
            db_table_name = self.table_name[2]
        upgrades_path = self._aws_glue.get_table(DatabaseName=self.db_name,
                                                 Name=db_table_name)['Table']['StorageDescriptor']['Location']
        bucket = upgrades_path.split('/')[2]
        key = '/'.join(upgrades_path.split('/')[3:])
        s3_data = self._aws_s3.list_objects(Bucket=bucket, Prefix=key)

        if 'Contents' not in s3_data:
            raise ValueError(f"Results parquet not found in s3 at {upgrades_path}")
        # out of the contents find the key with name matching the pattern results_up{upgrade_id}.parquet
        matching_files = [path['Key'] for path in s3_data['Contents']
                          if f"up{upgrade_id:02}.parquet" in path['Key'] or
                          f"upgrade{upgrade_id:02}.parquet" in path['Key']]
        if len(matching_files) > 1:
            raise ValueError(f"Multiple results parquet found in s3 at {upgrades_path} for upgrade {upgrade_id}."
                             f"These files matched: {matching_files}")
        if len(matching_files) == 0:
            raise ValueError(f"No results parquet found in s3 at {upgrades_path} for upgrade {upgrade_id}."
                             f"Here are the files: {[content[0]['Key'] for content in s3_data['Contents']]}")

        self._aws_s3.download_file(bucket, matching_files[0], local_copy_path)
        return local_copy_path

    def get_upgrades_csv_full(self, upgrade_id: int) -> pd.DataFrame:
        """ Returns the full results csv table for upgrades. This is the same as get_upgrades_csv without any
        restrictions. It uses the stored parquet files in s3 to download the results which is faster than querying
        athena.
        """
        local_copy_path = self._download_upgrades_csv(upgrade_id)
        df = pd.read_parquet(local_copy_path)
        if df.index.name != self.up_bldgid_column.name:
            df = df.set_index(self.up_bldgid_column.name)
        if 'upgrade' not in df.columns:
            df.insert(0, 'upgrade', upgrade_id)
        return df

    @typing.overload
    def get_building_ids(self, *,
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                             default_factory=list),
                         get_query_only: Literal[False] = False
                         ) -> pd.DataFrame:
        ...

    @typing.overload
    def get_building_ids(self, *,
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                             default_factory=list),
                         get_query_only: Literal[True]
                         ) -> str:
        ...

    @typing.overload
    def get_building_ids(self, *,
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                             default_factory=list),
                         get_query_only: bool
                         ) -> Union[pd.DataFrame, str]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_building_ids(self,
                         restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                             default_factory=list),
                         get_query_only: bool = False
                         ) -> Union[str, pd.DataFrame]:
        """
        Returns the list of buildings based on the restrict list
        Args:
            restrict (List[Tuple[str, List]], optional): The list of where condition to restrict the results to. It
                    should be specified as a list of tuple.
                    Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
            get_query_only (bool): If set to true, returns the query string instead of the result. Default is False.

        Returns:
            Pandas dataframe consisting of the building ids belonging to the provided list of locations.

        """
        restrict = list(restrict) if restrict else []
        query = sa.select(self.bs_bldgid_column)
        query = self._add_restrict(query, restrict, bs_only=True)
        if get_query_only:
            return self._compile(query)
        return self.execute(query)

    @typing.overload
    def _get_simulation_info(self, get_query_only: Literal[False] = False) -> SimInfo:
        ...

    @typing.overload
    def _get_simulation_info(self, get_query_only: Literal[True]) -> str:
        ...

    @validate_arguments(config=dict(smart_union=True))
    def _get_simulation_info(self, get_query_only: bool = False) -> Union[str, SimInfo]:
        # find the simulation time interval
        query0 = sa.select([self.ts_bldgid_column, self._ts_upgrade_col]).limit(1)  # get a building id and upgrade
        bldg_df = self.execute(query0)
        bldg_id = bldg_df.values[0][0]
        upgrade_id = bldg_df.values[0][1]
        query1 = sa.select([self.timestamp_column.distinct().label(
                            self.timestamp_column_name)]).where(self.ts_bldgid_column == bldg_id)
        if self.up_table is not None:
            query1 = query1.where(self._ts_upgrade_col == upgrade_id)
        query1 = query1.order_by(self.timestamp_column).limit(2)
        if get_query_only:
            return self._compile(query1)

        two_times = self.execute(query1)
        time1 = two_times[self.timestamp_column_name].iloc[0]
        time2 = two_times[self.timestamp_column_name].iloc[1]
        sim_year = time1.year
        reference_time = datetime(year=sim_year, month=1, day=1)
        sim_interval_seconds = int((time2 - time1).total_seconds())
        start_offset_seconds = int((time1 - reference_time).total_seconds())
        if sim_interval_seconds >= 28 * 24 * 60 * 60:  # 28 days or more means monthly resoultion
            assert start_offset_seconds in [0, 31 * 24 * 60 * 60]
            interval = 1
            offset = start_offset_seconds // (31 * 24 * 60 * 60)
            unit = "month"
        else:
            interval = sim_interval_seconds
            offset = start_offset_seconds
            unit = "second"
        assert offset in [0, interval]
        return SimInfo(sim_year, interval, offset, unit)

    def _get_special_column(self,
                            column_type: Literal['month', 'day', 'hour', 'is_weekend', 'day_of_week']) -> DBColType:
        sim_info = self._get_simulation_info()
        if sim_info.offset > 0:
            # If timestamps are not period begining we should make them so we get proper values of special columns.
            time_col = sa.func.date_add(sim_info.unit, -sim_info.offset, self.timestamp_column)
        else:
            time_col = self.timestamp_column

        if column_type == 'month':
            return sa.func.month(time_col).label('month')
        elif column_type == 'day':
            return sa.func.day(time_col).label('day')
        elif column_type == 'hour':
            return sa.func.hour(time_col).label('hour')
        elif column_type == 'day_of_week':
            return sa.func.day_of_week(time_col).label('day_of_week')
        elif column_type == 'is_weekend':
            return sa.cast(sa.func.day_of_week(time_col).in_([6, 7]), sa.Integer).label('is_weekend')
        else:
            assert_never(column_type)
            raise ValueError(f"Unknown special column type: {column_type}")

    def _get_gcol(self, column) -> DBColType:  # gcol => group by col
        """Get a DB column for the purpose of grouping. If the provided column doesn't exist as is,
        tries to get the column by prepending build_existing_model."""

        if isinstance(column, sa.Column):
            return column.label(self._simple_label(column.name))  # already a col

        if isinstance(column, SALabel):
            return column

        if isinstance(column, MappedColumn):
            return sa.literal(column).label(self._simple_label(column.name))

        if isinstance(column, tuple):
            try:
                return self._get_column(column[0]).label(column[1])
            except ValueError:
                new_name = f"{self._char_prefix}{column[0]}"
                return self._get_column(new_name).label(column[1])
        elif isinstance(column, str):
            try:
                return self._get_column(column).label(self._simple_label(column))
            except ValueError as e:
                if not column.startswith(self._char_prefix):
                    new_name = f"{self._char_prefix}{column}"
                    return self._get_column(new_name).label(column)
                raise ValueError(f"Invalid column name {column}") from e
        else:
            raise ValueError(f"Invalid column name type {column}: {type(column)}")

    def _get_enduse_cols(self, enduses: Sequence[AnyColType],
                         table='baseline') -> Sequence[DBColType]:
        tbls_dict = {'baseline': self.bs_table,
                     'upgrade': self.up_table,
                     'timeseries': self.ts_table}
        tbl = tbls_dict[table]
        enduse_cols: list[DBColType] = []
        for enduse in enduses:
            if isinstance(enduse, (sa.Column, SALabel)):
                enduse_cols.append(enduse)
            elif isinstance(enduse, str):
                try:
                    enduse_cols.append(tbl.c[enduse])
                except KeyError as err:
                    if table in ['baseline', 'upgrade']:
                        enduse_cols.append(tbl.c[f"{self._out_prefix}{enduse}"])
                    else:
                        raise ValueError(f"Invalid enduse column names for {table} table") from err
            elif isinstance(enduse, MappedColumn):
                enduse_cols.append(sa.literal(enduse).label(enduse.name))
            else:
                assert_never(enduse)
        return enduse_cols

    def get_groupby_cols(self) -> List[str]:
        """Find list of building characteristics that can be used for grouping.

        Returns:
            List[str]: List of building characteristics.
        """
        cols = {y.removeprefix(self._char_prefix) for y in self.bs_table.c.keys()
                if y.startswith(self._char_prefix)}
        return list(cols)

    def _validate_group_by(self, group_by: Sequence[Union[str, tuple[str, str]]]):
        valid_groupby_cols = self.get_groupby_cols()
        group_by_cols = [g[0] if isinstance(g, tuple) else g for g in group_by]
        if not set(group_by_cols).issubset(valid_groupby_cols):
            invalid_cols = ", ".join(f'"{x}"' for x in set(group_by).difference(valid_groupby_cols))
            raise ValueError(f"The following are not valid columns in the database: {invalid_cols}")
        return group_by
        # TODO: intelligently select groupby columns order by cardinality (most to least groups) for
        # performance

    def get_available_upgrades(self) -> Sequence[str]:
        """Get the available upgrade scenarios and their identifier numbers.
        Returns:
            list: List of upgrades
        """
        return list([str(u) for u in self.report.get_success_report().index])

    def _validate_upgrade(self, upgrade_id: Union[int, str]) -> str:
        upgrade_id = '0' if upgrade_id in (None, '0') else str(upgrade_id)
        available_upgrades = self.get_available_upgrades() or ['0']
        if upgrade_id not in set(available_upgrades):
            raise ValueError(f"`upgrade_id` = {upgrade_id} is not a valid upgrade."
                             "It doesn't exist or have no successful run")
        return str(upgrade_id)

    def _split_restrict(self, restrict):
        # Some cols like "state" might be available in both ts and bs table
        bs_restrict = []  # restrict to apply to baseline table
        ts_restrict = []  # restrict to apply to timeseries table
        for col, restrict_vals in restrict:
            if self.ts_table is not None and col in self.ts_table.columns:  # prioritize ts table
                ts_restrict.append([self.ts_table.c[col], restrict_vals])
            else:
                bs_restrict.append([self._get_gcol(col), restrict_vals])
        return bs_restrict, ts_restrict

    def _split_group_by(self, processed_group_by):
        # Some cols like "state" might be available in both ts and bs table
        ts_group_by = []  # restrict to apply to baseline table
        bs_group_by = []  # restrict to apply to timeseries table
        for g in processed_group_by:
            if self.ts_table is not None and g.name in self.ts_table.columns:
                ts_group_by.append(g)
            else:
                bs_group_by.append(g)
        return bs_group_by, ts_group_by

    def _clean_group_by(self, group_by):
        """
        :param group_by: The group_by list
        :return: cleaned version of group_by
        Sometimes, it is necessary to include the table name in the group_by column. For example, a group_by could be
        ['time', '"res_national_53_2018_baseline"."build_existing_model.state"']. This is necessary if the another table
        (such as correction factors table) that has the same column ("build_existing_model.state") as the baseline
        table. However, the query result will not include the table name in columns, so it is necessary to transform
        the group_by to a cleaner version (['time', 'build_existing_model.state']).
        Othertimes, quotes are used in group_by columns, such as ['"time"'], but the query result will not contain the
        quote so it is necessary to remove the quote.
        Some other time, a group_by column is specified as a tuple of column and a as name. For example, group_by can
        contain [('month(time)', 'MOY')], in this case, we want to convert it into just 'MOY' since that is what will be
        present in the returned query.
        """
        new_group_by = []
        for col in group_by:
            if isinstance(col, tuple):
                new_group_by.append(col[1])
                continue

            if match := re.search(r'"[\w\.]*"\."([\w\.]*)"', col) or re.search(r'"([\w\.]*)"', col):
                new_group_by.append(match.group(1))
            else:
                new_group_by.append(col)
        return new_group_by

    def _process_groupby_cols(self, group_by, annual_only=False):
        if not group_by:
            return []
        if annual_only:
            new_group_by = []
            for entry in group_by:
                if isinstance(entry, str) and not entry.startswith(self._char_prefix):
                    new_group_by.append(f"{self._char_prefix}{entry}")
                elif isinstance(entry, tuple) and not entry[0].startswith(self._char_prefix):
                    new_group_by.append((f"{self._char_prefix}{entry[0]}", entry[1]))
                else:
                    new_group_by.append(entry)
            group_by = new_group_by
        return [self._get_gcol(entry) for entry in group_by]

    def _get_simulation_timesteps_count(self):
        # find the simulation time interval
        query = sa.select([self.ts_bldgid_column, safunc.sum(1).label('count')])
        query = query.group_by(self.ts_bldgid_column)
        sim_timesteps_count = self.execute(query)
        bld0_step_count = sim_timesteps_count['count'].iloc[0]
        n_buildings_with_same_count = sum(sim_timesteps_count['count'] == bld0_step_count)
        if n_buildings_with_same_count != len(sim_timesteps_count):
            logger.warning("Not all buildings have the same number of timestamps. This can cause wrong"
                           "scaled_units_count and other problems.")

        return bld0_step_count

    @typing.overload
    def get_buildings_by_locations(self, location_col: str, locations: List[str],
                                   get_query_only: Literal[False] = False) -> pd.DataFrame:
        ...

    @typing.overload
    def get_buildings_by_locations(self, location_col: str, locations: List[str],
                                   get_query_only: Literal[True]) -> str:
        ...

    @typing.overload
    def get_buildings_by_locations(self, location_col: str, locations: List[str],
                                   get_query_only: bool) -> Union[str, pd.DataFrame]:
        ...

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def get_buildings_by_locations(self, location_col: str, locations: List[str],
                                   get_query_only: bool = False) -> Union[str, pd.DataFrame]:
        """
        Returns the list of buildings belonging to given list of locations.
        Args:
            location_col: The column used for "build_existing_model.county" etc
            locations: list of `build_existing_model.location' strings
            get_query_only: If set to true, returns the query string instead of the result

        Returns:
            Pandas dataframe consisting of the building ids belonging to the provided list of locations.

        """
        query = sa.select([self.bs_bldgid_column])
        query = query.where(self._get_column(location_col).in_(locations))
        query = self._add_order_by(query, [self.bs_bldgid_column])
        if get_query_only:
            return self._compile(query)
        res = self.execute(query)
        return res

    @property
    def _bs_completed_status_col(self):
        if not isinstance(self.bs_table.c[self.db_schema.column_names.completed_status].type, sqltypes.String):
            return sa.cast(self.bs_table.c[self.db_schema.column_names.completed_status],
                           sa.String).label('completed_status')
        else:
            return self.bs_table.c[self.db_schema.column_names.completed_status]

    @property
    def _up_completed_status_col(self):
        if self.up_table is None:
            raise ValueError("No upgrades table")
        if not isinstance(self.up_table.c[self.db_schema.column_names.completed_status].type, sqltypes.String):
            return sa.cast(self.up_table.c[self.db_schema.column_names.completed_status],
                           sa.String).label('completed_status')
        else:
            return self.up_table.c[self.db_schema.column_names.completed_status]

    @property
    def _bs_successful_condition(self):
        return self._bs_completed_status_col == self.db_schema.completion_values.success

    @property
    def _up_successful_condition(self):
        return self._up_completed_status_col == self.db_schema.completion_values.success

    @property
    def _ts_upgrade_col(self):
        if not isinstance(self.ts_table.c['upgrade'].type, sqltypes.String):
            return sa.cast(self.ts_table.c['upgrade'], sa.String).label('upgrade')
        else:
            return self.ts_table.c['upgrade']

    @property
    def _up_upgrade_col(self):
        if self.up_table is None:
            raise ValueError("No upgrades table")
        if not isinstance(self.up_table.c['upgrade'].type, sqltypes.String):
            return sa.cast(self.up_table.c['upgrade'], sa.String).label('upgrade')
        else:
            return self.up_table.c['upgrade']

    def _get_completed_status_col(self, table: AnyTableType):
        if not isinstance(table.c[self.db_schema.column_names.completed_status].type, sqltypes.String):
            return sa.cast(table.c[self.db_schema.column_names.completed_status],
                           sa.String).label('completed_status')
        else:
            return table.c[self.db_schema.column_names.completed_status]

    def _get_success_condition(self, table: AnyTableType):
        return self._get_completed_status_col(table) == self.db_schema.completion_values.success

Ancestors

Instance variables

var agg

BuildStockAggregate object to perform aggregate queries

var report

BuildStockReport object to perform report queries

var savings

BuildStockSavings object to perform savings queries

var utility

BuildStockUtility object to perform utility queries

Methods

def get_available_upgrades(self) ‑> Sequence[str]

Get the available upgrade scenarios and their identifier numbers.

Returns

list
List of upgrades
Expand source code
def get_available_upgrades(self) -> Sequence[str]:
    """Get the available upgrade scenarios and their identifier numbers.
    Returns:
        list: List of upgrades
    """
    return list([str(u) for u in self.report.get_success_report().index])
def get_building_ids(self, restrict: Sequence[tuple[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn], Union[str, int, Sequence[Union[int, str]]]]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), get_query_only: bool = False) ‑> Union[str, pandas.core.frame.DataFrame]

Returns the list of buildings based on the restrict list

Args

restrict : List[Tuple[str, List]], optional
The list of where condition to restrict the results to. It should be specified as a list of tuple. Example: [('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]
get_query_only : bool
If set to true, returns the query string instead of the result. Default is False.

Returns

Pandas dataframe consisting of the building ids belonging to the provided list of locations.

Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_building_ids(self,
                     restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                         default_factory=list),
                     get_query_only: bool = False
                     ) -> Union[str, pd.DataFrame]:
    """
    Returns the list of buildings based on the restrict list
    Args:
        restrict (List[Tuple[str, List]], optional): The list of where condition to restrict the results to. It
                should be specified as a list of tuple.
                Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
        get_query_only (bool): If set to true, returns the query string instead of the result. Default is False.

    Returns:
        Pandas dataframe consisting of the building ids belonging to the provided list of locations.

    """
    restrict = list(restrict) if restrict else []
    query = sa.select(self.bs_bldgid_column)
    query = self._add_restrict(query, restrict, bs_only=True)
    if get_query_only:
        return self._compile(query)
    return self.execute(query)
def get_buildings_by_locations(self, location_col: str, locations: List[str], get_query_only: bool = False) ‑> Union[str, pandas.core.frame.DataFrame]

Returns the list of buildings belonging to given list of locations.

Args

location_col
The column used for "build_existing_model.county" etc
locations
list of `build_existing_model.location' strings
get_query_only
If set to true, returns the query string instead of the result

Returns

Pandas dataframe consisting of the building ids belonging to the provided list of locations.

Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_buildings_by_locations(self, location_col: str, locations: List[str],
                               get_query_only: bool = False) -> Union[str, pd.DataFrame]:
    """
    Returns the list of buildings belonging to given list of locations.
    Args:
        location_col: The column used for "build_existing_model.county" etc
        locations: list of `build_existing_model.location' strings
        get_query_only: If set to true, returns the query string instead of the result

    Returns:
        Pandas dataframe consisting of the building ids belonging to the provided list of locations.

    """
    query = sa.select([self.bs_bldgid_column])
    query = query.where(self._get_column(location_col).in_(locations))
    query = self._add_order_by(query, [self.bs_bldgid_column])
    if get_query_only:
        return self._compile(query)
    res = self.execute(query)
    return res
def get_buildstock_df(self) ‑> pandas.core.frame.DataFrame

Returns the building characteristics data by quering Athena tables using the same format as that produced by the sampler and written as buildstock.csv. It only includes buildings with successful simulation.

Returns

pd.DataFrame
The buildstock.csv dataframe.
Expand source code
def get_buildstock_df(self) -> pd.DataFrame:
    """Returns the building characteristics data by quering Athena tables using the same format as that produced
    by the sampler and written as buildstock.csv. It only includes buildings with successful simulation.
    Returns:
        pd.DataFrame: The buildstock.csv dataframe.
    """
    results_df = self.get_results_csv_full()
    results_df = results_df[results_df[self.db_schema.column_names.completed_status].astype(str).str.lower() ==
                            self.db_schema.completion_values.success.lower()]
    buildstock_cols = [c for c in results_df.columns if c.startswith(self._char_prefix)]
    buildstock_df = results_df[buildstock_cols]
    buildstock_cols = [''.join(c.split(".")[1:]).replace("_", " ") for c in buildstock_df.columns
                       if c.startswith(self._char_prefix)]
    buildstock_df.columns = buildstock_cols
    return buildstock_df
def get_distinct_count(self, column: str, table_name: Optional[str] = None, get_query_only: bool = False) ‑> Union[str, pandas.core.frame.DataFrame]

Find distinct counts.

Args

column : str
The column in the table for which distinct counts is needed.
table_name : str, optional
The table in athena. Defaults to baseline table.
get_query_only : bool, optional
If true, only returns the SQL query. Defaults to False.

Returns

pd.Series
The distinct counts.
Expand source code
@validate_arguments(config=dict(smart_union=True))
def get_distinct_count(self, column: str, table_name: Optional[str] = None,
                       get_query_only: bool = False) -> Union[pd.DataFrame, str]:
    """
        Find distinct counts.
    Args:
        column (str): The column in the table for which distinct counts is needed.
        table_name (str, optional): The table in athena. Defaults to baseline table.
        get_query_only (bool, optional): If true, only returns the SQL query. Defaults to False.

    Returns:
        pd.Series: The distinct counts.
    """
    tbl = self.bs_table if table_name is None else self._get_table(table_name)
    query = sa.select([tbl.c[column], safunc.sum(1).label("sample_count"),
                       safunc.sum(self.sample_wt).label("weighted_count")])
    query = query.group_by(tbl.c[column]).order_by(tbl.c[column])
    if get_query_only:
        return self._compile(query)

    r = self.execute(query, run_async=False)
    return r
def get_distinct_vals(self, column: str, table_name: Optional[str], get_query_only: bool = False) ‑> Union[str, pandas.core.series.Series]

Find distinct vals.

Args

column : str
The column in the table for which distinct vals is needed.
table_name : str, optional
The table in athena. Defaults to baseline table.
get_query_only : bool, optional
If true, only returns the SQL query. Defaults to False.

Returns

pd.Series
The distinct vals.
Expand source code
@validate_arguments(config=dict(smart_union=True))
def get_distinct_vals(self, column: str, table_name: Optional[str],
                      get_query_only: bool = False) -> Union[str, pd.Series]:
    """
        Find distinct vals.
    Args:
        column (str): The column in the table for which distinct vals is needed.
        table_name (str, optional): The table in athena. Defaults to baseline table.
        get_query_only (bool, optional): If true, only returns the SQL query. Defaults to False.

    Returns:
        pd.Series: The distinct vals.
    """
    table_name = self.bs_table.name if table_name is None else table_name
    tbl = self._get_table(table_name)
    query = sa.select(tbl.c[column]).distinct()
    if get_query_only:
        return self._compile(query)

    r = self.execute(query, run_async=False)
    return r[column]
def get_groupby_cols(self) ‑> List[str]

Find list of building characteristics that can be used for grouping.

Returns

List[str]
List of building characteristics.
Expand source code
def get_groupby_cols(self) -> List[str]:
    """Find list of building characteristics that can be used for grouping.

    Returns:
        List[str]: List of building characteristics.
    """
    cols = {y.removeprefix(self._char_prefix) for y in self.bs_table.c.keys()
            if y.startswith(self._char_prefix)}
    return list(cols)
def get_results_csv(self, restrict: Sequence[tuple[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn], Union[str, int, Sequence[Union[int, str]]]]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), get_query_only: bool = False) ‑> Union[str, pandas.core.frame.DataFrame]

Returns the results_csv table for the BuildStock run

Args

restrict : List[Tuple[str, Union[List, str, int]]], optional
The list of where condition to restrict the results to. It should be specified as a list of tuple. Example: [('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]
get_query_only : bool
If set to true, returns the list of queries to run instead of the result.

Returns

Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities

Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_results_csv(self,
                    restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
                        default_factory=list),
                    get_query_only: bool = False) -> Union[pd.DataFrame, str]:
    """
    Returns the results_csv table for the BuildStock run
    Args:
        restrict (List[Tuple[str, Union[List, str, int]]], optional): The list of where condition to restrict the
            results to. It should be specified as a list of tuple.
                  Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
        get_query_only (bool): If set to true, returns the list of queries to run instead of the result.

    Returns:
        Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities
    """
    restrict = list(restrict) if restrict else []
    query = sa.select(['*']).select_from(self.bs_table)
    query = self._add_restrict(query, restrict, bs_only=True)
    compiled_query = self._compile(query)
    if get_query_only:
        return compiled_query
    self._session_queries.add(compiled_query)
    if compiled_query in self._query_cache:
        return self._query_cache[compiled_query].copy().set_index(self.bs_bldgid_column.name)
    logger.info("Making results_csv query ...")
    result = self.execute(query)
    return result.set_index(self.bs_bldgid_column.name)
def get_results_csv_full(self) ‑> pandas.core.frame.DataFrame

Returns the full results csv table. This is the same as get_results_csv without any restrictions. It uses the stored parquet files in s3 to download the results which is faster than querying athena.

Returns

pd.DataFrame
The full results csv.
Expand source code
def get_results_csv_full(self) -> pd.DataFrame:
    """Returns the full results csv table. This is the same as get_results_csv without any restrictions. It uses
    the stored parquet files in s3 to download the results which is faster than querying athena.
    Returns:
        pd.DataFrame: The full results csv.
    """
    local_copy_path = self._download_results_csv()
    df = pd.read_parquet(local_copy_path)
    if df.index.name != self.bs_bldgid_column.name:
        df = df.set_index(self.bs_bldgid_column.name)
    return df
def get_upgrades_analyzer(self, yaml_file: str, opt_sat_file: str) ‑> UpgradesAnalyzer

Returns the UpgradesAnalyzer object with buildstock.csv downloaded from athena (see get_buildstock_df help)

Args

yaml_file : str
The path to the buildstock configuration file.
opt_sat_file : str
The path to the opt_saturation.csv file for the housing characteristics.

Returns

UpgradesAnalyzer
returns UpgradesAnalyzer object. See UpgradesAnalyzer.
Expand source code
@validate_arguments
def get_upgrades_analyzer(self, yaml_file: str, opt_sat_file: str) -> UpgradesAnalyzer:
    """
        Returns the UpgradesAnalyzer object with buildstock.csv downloaded from athena (see get_buildstock_df help)

    Args:
        yaml_file (str): The path to the buildstock configuration file.
        opt_sat_file (str): The path to the opt_saturation.csv file for the housing characteristics.

    Returns:
        UpgradesAnalyzer: returns UpgradesAnalyzer object. See UpgradesAnalyzer.
    """

    buildstock_df = self.get_buildstock_df()
    ua = UpgradesAnalyzer(buildstock=buildstock_df, yaml_file=yaml_file, opt_sat_file=opt_sat_file)
    return ua
def get_upgrades_csv(self, *, upgrade_id: Union[str, int] = '0', restrict: Sequence[tuple[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn], Union[str, int, Sequence[Union[int, str]]]]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), get_query_only: bool = False) ‑> Union[str, pandas.core.frame.DataFrame]

Returns the results_csv table for the BuildStock run for an upgrade.

Args

restrict
The list of where condition to restrict the results to. It should be specified as a list of tuple. Example: [('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]
get_query_only
If set to true, returns the list of queries to run instead of the result.

Returns

Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities

Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_upgrades_csv(self, *, upgrade_id: Union[str, int] = '0',
                     restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(
        default_factory=list),
        get_query_only: bool = False) -> Union[pd.DataFrame, str]:
    """
    Returns the results_csv table for the BuildStock run for an upgrade.
    Args:
        restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple.
                  Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
        get_query_only: If set to true, returns the list of queries to run instead of the result.

    Returns:
        Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities
    """
    restrict = list(restrict) if restrict else []
    query = sa.select(['*']).select_from(self.up_table)
    if upgrade_id:
        if self.up_table is None:
            raise ValueError("This run has no upgrades")
        query = query.where(self.up_table.c['upgrade'] == str(upgrade_id))

    query = self._add_restrict(query, restrict, bs_only=True)
    compiled_query = self._compile(query)
    if get_query_only:
        return compiled_query
    self._session_queries.add(compiled_query)
    if compiled_query in self._query_cache:
        return self._query_cache[compiled_query].copy().set_index(self.bs_bldgid_column.name)
    logger.info("Making results_csv query for upgrade ...")
    return self.execute(query).set_index(self.bs_bldgid_column.name)
def get_upgrades_csv_full(self, upgrade_id: int) ‑> pandas.core.frame.DataFrame

Returns the full results csv table for upgrades. This is the same as get_upgrades_csv without any restrictions. It uses the stored parquet files in s3 to download the results which is faster than querying athena.

Expand source code
def get_upgrades_csv_full(self, upgrade_id: int) -> pd.DataFrame:
    """ Returns the full results csv table for upgrades. This is the same as get_upgrades_csv without any
    restrictions. It uses the stored parquet files in s3 to download the results which is faster than querying
    athena.
    """
    local_copy_path = self._download_upgrades_csv(upgrade_id)
    df = pd.read_parquet(local_copy_path)
    if df.index.name != self.up_bldgid_column.name:
        df = df.set_index(self.up_bldgid_column.name)
    if 'upgrade' not in df.columns:
        df.insert(0, 'upgrade', upgrade_id)
    return df

Inherited members

class MappedColumn (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class MappedColumn(BaseModel):
    bsq: Any  # BuildStockQuery
    name: str
    mapping_dict: dict
    key: Union[Union[DBColType, str], Sequence[Union[DBColType, str]]]

    class Config:
        arbitrary_types_allowed = True
        smart_union = True

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Config
var bsq : Any
var key : Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, Sequence[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str]]]
var mapping_dict : dict
var name : str
class UpgradesAnalyzer (yaml_file: str, buildstock: Union[str, pandas.core.frame.DataFrame], opt_sat_file: str)

Analyze the apply logic for various upgrades in the project yaml file.

Initialize the analyzer instance.

Args

yaml_file : str
The path to the yaml file.
buildstock : Union[str, pd.DataFrame]
Either the buildstock dataframe, or path to the csv
opt_sat_file : str
The path to the option saturation file.
Expand source code
class UpgradesAnalyzer:
    """
    Analyze the apply logic for various upgrades in the project yaml file.
    """

    def __init__(self, yaml_file: str,
                 buildstock: Union[str, pd.DataFrame],
                 opt_sat_file: str) -> None:
        """
        Initialize the analyzer instance.
        Args:
            yaml_file (str): The path to the yaml file.
            buildstock (Union[str, pd.DataFrame]): Either the buildstock dataframe, or path to the csv
            opt_sat_file (str): The path to the option saturation file.
        """
        self.parser = LogicParser(opt_sat_file, yaml_file)
        self.yaml_file = yaml_file
        if isinstance(buildstock, str):
            self.buildstock_df_original = read_csv(buildstock, dtype=str)
            self.buildstock_df = self.buildstock_df_original.copy()
            self.buildstock_df.columns = [c.lower() for c in self.buildstock_df.columns]
            self.buildstock_df.rename(columns={"building": "building_id"}, inplace=True)
            self.buildstock_df.set_index("building_id", inplace=True)
        elif isinstance(buildstock, pd.DataFrame):
            self.buildstock_df_original = buildstock.copy()
            self.buildstock_df = buildstock.reset_index().rename(columns=str.lower)
            self.buildstock_df.rename(columns={"building": "building_id"}, inplace=True)
            if "building_id" in self.buildstock_df.columns:
                self.buildstock_df.set_index("building_id", inplace=True)
            self.buildstock_df = self.buildstock_df.astype(str)
        self.total_samples = len(self.buildstock_df)
        self._logic_cache: dict = {}

    def get_cfg(self) -> dict:
        """Get the buildstock configuration file as a dictionary object.

        Returns:
            dict: The buildstock configuration file.
        """
        with open(self.yaml_file) as f:
            config = yaml.load(f, Loader=yaml.SafeLoader)
        return config

    @staticmethod
    def _get_eq_str(condition):
        para, option = UpgradesAnalyzer._get_para_option(condition)
        return f"`{para.lower()}`=='{option}'"

    @staticmethod
    def _get_para_option(condition):
        try:
            para, option = condition.split("|")
        except ValueError as e:
            raise ValueError(f"Condition {condition} is invalid") from e
        return para.lower(), option

    @staticmethod
    def get_mentioned_parameters(logic: Union[list, dict, str]) -> list:
        """
        Returns the list of all parameters referenced in a logic block. Useful for debugging

        Args:
            logic ( Union[list, dict, str]): The apply logic

        Raises:
            ValueError: If the input logic is invalid

        Returns:
            List: The list of parameters
        """
        if not logic:
            return []

        if isinstance(logic, str):
            return [UpgradesAnalyzer._get_para_option(logic)[0]]
        elif isinstance(logic, list):
            all_params = []
            for el in logic:
                all_params.extend(UpgradesAnalyzer.get_mentioned_parameters(el))
            return list(dict.fromkeys(all_params))  # remove duplicates while maintainig order
        elif isinstance(logic, dict):
            return UpgradesAnalyzer.get_mentioned_parameters(list(logic.values())[0])
        else:
            raise ValueError("Invalid logic type")

    def print_unique_characteristic(self, upgrade_num: int, name: str, base_bldg_list: list, compare_bldg_list: list):
        """Finds and prints what's unique among a list of buildings compared to baseline buildings.
           Useful for debugging why a certain set of buildings' energy consumption went up for an upgrade, for example.
        Args:
            upgrade_num (int): The upgrade for which the analysis is being done.
            name (str): Some name to identify the building set (only used for printing)
            base_bldg_list (list): The set of 'normal' buildings id to compare against.
            compare_bldg_list (list): The set of buildings whose unique characteristics is to be printed.
        """
        cfg = self.get_cfg()
        if upgrade_num == 0:
            raise ValueError(f"Upgrades are 1-indexed. Got {upgrade_num}")

        try:
            upgrade_cfg = cfg["upgrades"][upgrade_num - 1]
        except KeyError as e:
            raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrades are 1-indexed, FYI.") from e

        parameter_list = []
        for option_cfg in upgrade_cfg["options"]:
            parameter_list.append(UpgradesAnalyzer._get_para_option(option_cfg["option"])[0])
            parameter_list.extend(UpgradesAnalyzer.get_mentioned_parameters(option_cfg.get("apply_logic")))
        res_df = self.buildstock_df
        # remove duplicates (dict.fromkeys) and remove parameters not existing in buildstock_df
        parameter_list = [param for param in dict.fromkeys(parameter_list) if param in res_df.columns]
        compare_df = res_df.loc[compare_bldg_list]
        base_df = res_df.loc[base_bldg_list]
        print(f"Comparing {len(compare_df)} buildings with {len(base_df)} other buildings.")
        unique_vals_dict: dict[tuple[str, ...], set[tuple[str, ...]]] = {}
        for col in res_df.columns:
            no_change_set = set(compare_df[col].fillna("").unique())
            other_set = set(base_df[col].fillna("").unique())
            if only_in_no_change := no_change_set - other_set:
                print(f"Only {name} buildings have {col} in {sorted(only_in_no_change)}")
                unique_vals_dict[(col,)] = {(entry,) for entry in only_in_no_change}

        if not unique_vals_dict:
            print("No 1-column unique chracteristics found.")

        for combi_size in range(2, min(len(parameter_list) + 1, 5)):
            print(f"Checking {combi_size} column combinations out of {parameter_list}")
            found_uniq_chars = 0
            for cols in combinations(parameter_list, combi_size):
                compare_tups = compare_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None)
                other_tups = base_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None)
                only_in_compare = set(compare_tups) - set(other_tups)

                # remove cases arisen out of uniqueness found earlier with smaller susbset of cols
                for sub_combi_size in range(1, len(cols)):
                    for sub_cols in combinations(cols, sub_combi_size):
                        if sub_cols in unique_vals_dict:
                            new_set = set()
                            for val in only_in_compare:
                                relevant_val = tuple(val[cols.index(sub_col)] for sub_col in sub_cols)
                                if relevant_val not in unique_vals_dict[sub_cols]:
                                    new_set.add(val)
                            only_in_compare = new_set

                if only_in_compare:
                    print(f"Only {name} buildings have {cols} in {sorted(only_in_compare)} \n")
                    found_uniq_chars += 1
                    unique_vals_dict[cols] = only_in_compare

            if not found_uniq_chars:
                print(f"No {combi_size}-column unique chracteristics found.")

    def _reduce_logic(self, logic, parent=None):
        cache_key = str(logic) if parent is None else parent + "[" + str(logic) + "]"
        if cache_key in self._logic_cache:
            return self._logic_cache[cache_key]

        logic_array = np.ones((1, self.total_samples), dtype=bool)
        if parent not in [None, "and", "or", "not"]:
            raise ValueError(f"Logic can only inlcude and, or, not blocks. {parent} found in {logic}.")

        if isinstance(logic, str):
            para, opt = UpgradesAnalyzer._get_para_option(logic)
            logic_array = self.buildstock_df[para] == opt
        elif isinstance(logic, list):
            if len(logic) == 1:
                logic_array = self._reduce_logic(logic[0]).copy()
            elif parent in ["or"]:
                logic_array = reduce(
                    lambda l1, l2: l1 | self._reduce_logic(l2),
                    logic,
                    np.zeros((1, self.total_samples), dtype=bool),
                )
            else:
                logic_array = reduce(
                    lambda l1, l2: l1 & self._reduce_logic(l2),
                    logic,
                    np.ones((1, self.total_samples), dtype=bool),
                )
        elif isinstance(logic, dict):
            if len(logic) > 1:
                raise ValueError(f"Dicts cannot have more than one keys. {logic} has.")
            key = list(logic.keys())[0]
            logic_array = self._reduce_logic(logic[key], parent=key).copy()

        if parent == "not":
            return ~logic_array
        if not (isinstance(logic, str) or (isinstance(logic, list) and len(logic) == 1)):
            # Don't cache small logics - computing them again won't be too bad
            self._logic_cache[cache_key] = logic_array.copy()
        return logic_array

    def get_report(self, upgrade_num: Optional[int] = None) -> pd.DataFrame:
        """Analyses how many buildings various options in all the upgrades is going to apply to and returns
        a report in DataFrame format.
        Args:
            upgrade_num: Numeric index of upgrade (1-indexed). If None, all upgrades are assessed

        Returns:
            pd.DataFrame: The upgrade and options report.

        """

        def _get_records(indx, upgrade):
            records = []
            logger.info(f"Analyzing upgrade {indx + 1}")
            all_applied_bldgs = np.zeros((1, self.total_samples), dtype=bool)
            package_applied_bldgs = np.ones((1, self.total_samples), dtype=bool)
            if "package_apply_logic" in upgrade:
                pkg_flat_logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"])
                package_applied_bldgs = self._reduce_logic(pkg_flat_logic, parent=None)

            for opt_index, option in enumerate(upgrade["options"]):
                applied_bldgs = np.ones((1, self.total_samples), dtype=bool)
                if "apply_logic" in option:
                    flat_logic = UpgradesAnalyzer._normalize_lists(option["apply_logic"])
                    applied_bldgs &= self._reduce_logic(flat_logic, parent=None)
                else:
                    applied_bldgs = np.ones((1, self.total_samples), dtype=bool)

                applied_bldgs &= package_applied_bldgs
                count = applied_bldgs.sum()
                all_applied_bldgs |= applied_bldgs
                record = {
                    "upgrade": indx + 1,
                    "upgrade_name": upgrade["upgrade_name"],
                    "option_num": opt_index + 1,
                    "option": option["option"],
                    "applicable_to": count,
                    "applicable_percent": self._to_pct(count),
                    "applicable_buildings": set(self.buildstock_df.loc[applied_bldgs[0]].index),
                }
                records.append(record)

            count = all_applied_bldgs.sum()
            record = {
                "upgrade": indx + 1,
                "upgrade_name": upgrade["upgrade_name"],
                "option_num": -1,
                "option": "All",
                "applicable_to": count,
                "applicable_buildings": set(self.buildstock_df.loc[all_applied_bldgs[0]].index),
                "applicable_percent": self._to_pct(count),
            }
            records.append(record)
            return records

        cfg = self.get_cfg()
        self._logic_cache = {}
        if "upgrades" not in cfg:
            raise ValueError("The project yaml has no upgrades defined")

        max_upg = len(cfg["upgrades"]) + 1
        if upgrade_num is not None:
            if upgrade_num <= 0 or upgrade_num > max_upg:
                raise ValueError(f"Invalid upgrade {upgrade_num}. Valid upgrade_num = {list(range(1, max_upg))}.")

        records = []
        for indx, upgrade in enumerate(cfg["upgrades"]):
            if upgrade_num is None or upgrade_num == indx + 1:
                records += _get_records(indx, upgrade)
            else:
                continue

        report_df = pd.DataFrame.from_records(records)
        return report_df

    def get_upgraded_buildstock(self, upgrade_num):
        report_df = self.get_report(upgrade_num)
        upgrade_name = report_df["upgrade_name"].unique()[0]
        logger.info(f" * Upgraded buildstock for upgrade {upgrade_num} : {upgrade_name}")

        df = self.buildstock_df_original.copy()
        for idx, row in report_df.iterrows():
            if row["option"] == "All":
                continue
            dimension, upgrade_option = row["option"].split("|")
            apply_logic = df["Building"].isin(row["applicable_buildings"])
            # apply upgrade
            df[dimension] = np.where(apply_logic, upgrade_option, df[dimension])

        # report
        cond = report_df["option"] == "All"
        n_total = len(self.buildstock_df_original)
        n_applied = report_df.loc[cond, "applicable_to"].iloc[0]
        n_applied_pct = report_df.loc[cond, "applicable_percent"].iloc[0]
        logger.info(
            f"   Upgrade package has {len(report_df)-1} options and "
            f"was applied to {n_applied} / {n_total} dwelling units ( {n_applied_pct} % )"
        )

        # QC
        n_diff = len(self.buildstock_df_original.compare(df)) - n_applied
        if n_diff > 0:
            raise ValueError(
                f"Relative to baseline buildstock, upgraded buildstock has {n_diff} more rows "
                "of difference than reported."
            )
        elif n_diff < 0:
            logger.warning(
                f"Relative to baseline buildstock, upgraded buildstock has {-1*n_diff} fewer rows "
                "of difference than reported. This is okay, but indicates that some parameters are "
                "being upgraded to the same incumbent option (e.g., LEDs to LEDs). Check that this is intentional."
            )
        else:
            logger.info("No cases of parameter upgraded with incumbent option detected.")

        return df

    @staticmethod
    def _normalize_lists(logic, parent=None):
        """Any list that is not in a or block is considered to be in an and block.
           This block will normalize this pattern by adding "and" wherever required.
        Args:
            logic (_type_): Logic structure (dict, list etc)
            parent (_type_, optional): The parent of the current logic block. If it is a list, and there is no parent,
            the list will be wrapped in a and block.

        Returns:
            _type_: _description_
        """
        if isinstance(logic, list):
            # If it is a single element list, just unwrap and return
            if len(logic) == 1:
                return UpgradesAnalyzer._normalize_lists(logic[0])
            new_logic = [UpgradesAnalyzer._normalize_lists(el) for el in logic]
            return {"and": new_logic} if parent is None else new_logic
        elif isinstance(logic, dict):
            new_dict = {key: UpgradesAnalyzer._normalize_lists(value, parent=key) for key, value in logic.items()}
            return new_dict
        else:
            return logic

    def _get_options_application_count_report(self, logic_dict) -> Optional[pd.DataFrame]:
        """
        For a given logic dictionary, this method will return a report df of options application.
        Example report below:
                           Applied options Applied buildings Cumulative sub Cumulative all
        Number of options
        4                    1, 10, 13, 14         75 (0.1%)      75 (0.1%)      75 (0.1%)
        4                    1, 11, 13, 14       2279 (2.3%)    2354 (2.4%)    2354 (2.4%)
        4                    1, 12, 13, 14        309 (0.3%)    2663 (2.7%)    2663 (2.7%)
        5                  1, 2, 3, 13, 14          8 (0.0%)       8 (0.0%)    2671 (2.7%)
        5                  1, 2, 4, 13, 14        158 (0.2%)     166 (0.2%)    2829 (2.8%)
        5                  1, 2, 5, 13, 14         65 (0.1%)     231 (0.2%)    2894 (2.9%)
        5                  1, 6, 7, 13, 14         23 (0.0%)     254 (0.3%)    2917 (2.9%)
        5                  1, 6, 8, 13, 14         42 (0.0%)     296 (0.3%)    2959 (3.0%)
        """

        n_options = len(logic_dict)
        if n_options < 2:
            return None

        logic_df = pd.DataFrame(logic_dict)
        nbldgs = len(logic_df)
        opts2count = logic_df.apply(lambda row: tuple(indx+1 for indx, val in enumerate(row) if val),
                                    axis=1).value_counts().to_dict()
        cum_count_all = 0
        cum_count = defaultdict(int)
        application_report_rows = []
        for applied_opts in sorted(opts2count.keys(), key=lambda x: (len(x), x)):
            num_opt = len(applied_opts)
            if num_opt == 0:
                continue
            n_applied_bldgs = opts2count[applied_opts]
            cum_count_all += n_applied_bldgs
            cum_count[num_opt] += n_applied_bldgs
            record = {"Number of options": num_opt,
                      "Applied options": ", ".join([f"{logic_df.columns[opt - 1]}" for opt in applied_opts]),
                      "Applied buildings": f"{n_applied_bldgs} ({self._to_pct(n_applied_bldgs, nbldgs)}%)",
                      "Cumulative sub": f"{cum_count[num_opt]} ({self._to_pct(cum_count[num_opt], nbldgs)}%)",
                      "Cumulative all": f"{cum_count_all} ({self._to_pct(cum_count_all, nbldgs)}%)"
                      }
            application_report_rows.append(record)

        assert cum_count_all <= nbldgs, "Cumulative count of options applied is more than total number of buildings."
        if application_report_rows:
            application_report_df = pd.DataFrame(application_report_rows).set_index("Number of options")
            return application_report_df
        return None

    def _get_left_out_report_all(self, upgrade_num):
        cfg = self.get_cfg()
        report_str = ""
        upgrade = cfg["upgrades"][upgrade_num - 1]
        ugrade_name = upgrade.get("upgrade_name")
        header = f"Left Out Report for - Upgrade{upgrade_num}:'{ugrade_name}'"
        report_str += "-" * len(header) + "\n"
        report_str += header + "\n"
        report_str += "-" * len(header) + "\n"
        logic = {"or": []}
        for opt in upgrade["options"]:
            if "apply_logic" in opt:
                logic["or"].append(self._normalize_lists(opt["apply_logic"]))
        if "package_apply_logic" in upgrade:
            logic = {"and": [logic, upgrade["package_apply_logic"]]}
        logic = {"not": logic}  # invert it
        logic = self.parser.normalize_logic(logic)
        logic_array, logic_str = self._get_logic_report(logic)
        footer_len = len(logic_str[-1])
        report_str += "\n".join(logic_str) + "\n"
        report_str += "-" * footer_len + "\n"
        count = logic_array.sum()
        footer_str = f"Overall Not Applied to => {count} ({self._to_pct(count)}%)."
        report_str += footer_str + "\n"
        report_str += "-" * len(footer_str) + "\n"
        return logic_array, report_str

    def get_left_out_report(self, upgrade_num: int, option_num: Optional[int] = None) -> tuple[np.ndarray, str]:
        """Prints detailed report for a particular upgrade (and optionally, an option)
        Args:
            upgrade_num (int): The 1-indexed upgrade for which to print the report.
            option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which
                                        will print report for all options.
            normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False.
        Returns:
            (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.
        """
        cfg = self.get_cfg()
        if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1:
            raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.")

        if option_num is None:
            return self._get_left_out_report_all(upgrade_num)

        self._logic_cache = {}
        if upgrade_num == 0 or option_num == 0:
            raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}")
        report_str = ""
        try:
            upgrade = cfg["upgrades"][upgrade_num - 1]
            opt = upgrade["options"][option_num - 1]
        except (KeyError, IndexError, TypeError) as e:
            raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e

        ugrade_name = upgrade.get("upgrade_name")
        header = f"Left Out Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'"
        report_str += "-" * len(header) + "\n"
        report_str += header + "\n"
        report_str += "-" * len(header) + "\n"
        if "apply_logic" in opt and "package_apply_logic" in upgrade:
            logic = {"not": {"and": [opt["apply_logic"], upgrade["package_apply_logic"]]}}
        elif "apply_logic" in opt:
            logic = {"not": opt["apply_logic"]}
        else:
            logic = {"not": upgrade["package_apply_logic"]}
        logic = self.parser.normalize_logic(logic)

        logic_array, logic_str = self._get_logic_report(logic)
        footer_len = len(logic_str[-1])
        report_str += "\n".join(logic_str) + "\n"
        report_str += "-" * footer_len + "\n"
        count = logic_array.sum()
        footer_str = f"Overall Not Applied to => {count} ({self._to_pct(count)}%)."
        report_str += footer_str + "\n"
        report_str += "-" * len(footer_str) + "\n"
        return logic_array, report_str

    def get_detailed_report(self, upgrade_num: int, option_num: Optional[int] = None,
                            normalize_logic: bool = False) -> tuple[np.ndarray, str]:
        """Prints detailed report for a particular upgrade (and optionally, an option)
        Args:
            upgrade_num (int): The 1-indexed upgrade for which to print the report.
            option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which
                                        will print report for all options.
            normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False.
        Returns:
            (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.
        """
        cfg = self.get_cfg()
        if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1:
            raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.")

        if option_num is None:
            return self._get_detailed_report_all(upgrade_num, normalize_logic=normalize_logic)

        self._logic_cache = {}
        if upgrade_num == 0 or option_num == 0:
            raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}")
        report_str = ""
        try:
            upgrade = cfg["upgrades"][upgrade_num - 1]
            opt = upgrade["options"][option_num - 1]
        except (KeyError, IndexError, TypeError) as e:
            raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e

        ugrade_name = upgrade.get("upgrade_name")
        header = f"Option Apply Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'"
        report_str += "-" * len(header) + "\n"
        report_str += header + "\n"
        report_str += "-" * len(header) + "\n"
        if "apply_logic" in opt:
            logic = UpgradesAnalyzer._normalize_lists(opt["apply_logic"])
            logic = self.parser.normalize_logic(logic) if normalize_logic else logic
            logic_array, logic_str = self._get_logic_report(logic)
            footer_len = len(logic_str[-1])
            report_str += "\n".join(logic_str) + "\n"
            report_str += "-" * footer_len + "\n"
        else:
            logic_array = np.ones((1, self.total_samples), dtype=bool)

        if "package_apply_logic" in upgrade:
            logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"])
            logic = self.parser.normalize_logic(logic) if normalize_logic else logic
            package_logic_array, logic_str = self._get_logic_report(logic)
            footer_len = len(logic_str[-1])
            report_str += "Package Apply Logic Report" + "\n"
            report_str += "--------------------------" + "\n"
            report_str += "\n".join(logic_str) + "\n"
            report_str += "-" * footer_len + "\n"
            logic_array = logic_array & package_logic_array

        count = logic_array.sum()
        footer_str = f"Overall applied to => {count} ({self._to_pct(count)}%)."
        report_str += footer_str + "\n"
        report_str += "-" * len(footer_str) + "\n"
        return logic_array, report_str

    def _get_detailed_report_all(self, upgrade_num, normalize_logic: bool = False):
        conds_dict = {}
        grouped_conds_dict = {}
        cfg = self.get_cfg()
        report_str = ""
        n_options = len(cfg["upgrades"][upgrade_num - 1]["options"])
        or_array = np.zeros((1, self.total_samples), dtype=bool)
        and_array = np.ones((1, self.total_samples), dtype=bool)
        for option_indx in range(n_options):
            logic_array, sub_report_str = self.get_detailed_report(upgrade_num, option_indx + 1,
                                                                   normalize_logic=normalize_logic)
            opt_name, _ = self._get_para_option(cfg["upgrades"][upgrade_num - 1]["options"][option_indx]["option"])
            report_str += sub_report_str + "\n"
            conds_dict[option_indx + 1] = logic_array
            if opt_name not in grouped_conds_dict:
                grouped_conds_dict[opt_name] = logic_array
            else:
                grouped_conds_dict[opt_name] |= logic_array
            or_array |= logic_array
            and_array &= logic_array
        and_count = and_array.sum()
        or_count = or_array.sum()
        report_str += f"All of the options (and-ing) were applied to: {and_count} ({self._to_pct(and_count)}%)" + "\n"
        report_str += f"Any of the options (or-ing) were applied to: {or_count} ({self._to_pct(or_count)}%)" + "\n"

        option_app_report = self._get_options_application_count_report(grouped_conds_dict)
        if option_app_report is not None:
            report_str += "-" * 80 + "\n"
            report_str += f"Report of how the {len(grouped_conds_dict)} options were applied to the buildings." + "\n"
            report_str += tabulate(option_app_report, headers='keys', tablefmt='grid', maxcolwidths=50) + "\n"

        detailed_app_report_df = self._get_options_application_count_report(conds_dict)
        if detailed_app_report_df is not None:
            report_str += "-" * 80 + "\n"
            if len(detailed_app_report_df) > 100:
                report_str += "Detailed report is skipped because of too many rows. " + "\n"
                report_str += "Ask the developer if this is useful to see" + "\n"
            else:
                report_str += f"Detailed report of how the {n_options} options were applied to the buildings." + "\n"
                report_str += tabulate(option_app_report, headers='keys', tablefmt='grid', maxcolwidths=50) + "\n"
        return or_array, report_str

    def _to_pct(self, count, total=None):
        total = total or self.total_samples
        return round(100 * count / total, 1)

    def _get_logic_report(self, logic, parent=None):
        logic_array = np.ones((1, self.total_samples), dtype=bool)
        logic_str = [""]
        if parent not in [None, "and", "or", "not"]:
            raise ValueError(f"Logic can only include and, or, not blocks. {parent} found in {logic}.")
        if isinstance(logic, str):
            logic_condition = UpgradesAnalyzer._get_eq_str(logic)
            logic_array = self.buildstock_df.eval(logic_condition, engine="python")
            count = logic_array.sum()
            logic_str = [logic + " => " + f"{count} ({self._to_pct(count)}%)"]
        elif isinstance(logic, list):
            if len(logic) == 1:
                logic_array, logic_str = self._get_logic_report(logic[0])
            elif parent in ["or"]:

                def reducer(l1, l2):
                    ll2 = self._get_logic_report(l2)
                    return l1[0] | ll2[0], l1[1] + ll2[1]

                logic_array, logic_str = reduce(reducer, logic, (np.zeros((1, self.total_samples), dtype=bool), []))
            else:

                def reducer(l1, l2):
                    ll2 = self._get_logic_report(l2)
                    return l1[0] & ll2[0], l1[1] + ll2[1]

                logic_array, logic_str = reduce(reducer, logic, (np.ones((1, self.total_samples), dtype=bool), []))
        elif isinstance(logic, dict):
            if len(logic) > 1:
                raise ValueError(f"Dicts cannot have more than one keys. {logic} has.")
            key = list(logic.keys())[0]
            sub_logic = self._get_logic_report(logic[key], parent=key)
            sub_logic_str = sub_logic[1]
            logic_array = sub_logic[0]
            if key == "not":
                logic_array = ~logic_array
            count = logic_array.sum()
            header_str = key + " => " + f"{count} ({self._to_pct(count)}%)"
            logic_str = [header_str] + [f"  {ls}" for ls in sub_logic_str]

        count = logic_array.sum()
        if parent is None and isinstance(logic, list) and len(logic) > 1:
            logic_str[0] = logic_str[0] + " => " + f"{count} ({self._to_pct(count)}%)"

        return logic_array, logic_str

    def save_detailed_report_all(self, file_path: str, logic_transform=None):
        """Save detailed text based upgrade report.

        Args:
            file_path (str): Output file.
        """
        cfg = self.get_cfg()
        all_report = ""
        for upgrade in range(1, len(cfg["upgrades"]) + 1):
            logger.info(f"Getting report for upgrade {upgrade}")
            _, report = self.get_detailed_report(upgrade, normalize_logic=logic_transform)
            all_report += report + "\n"
        with open(file_path, "w") as file:
            file.write(all_report)

Static methods

def get_mentioned_parameters(logic: Union[list, dict, str]) ‑> list

Returns the list of all parameters referenced in a logic block. Useful for debugging

Args

logic :  Union[list, dict, str]
The apply logic

Raises

ValueError
If the input logic is invalid

Returns

List
The list of parameters
Expand source code
@staticmethod
def get_mentioned_parameters(logic: Union[list, dict, str]) -> list:
    """
    Returns the list of all parameters referenced in a logic block. Useful for debugging

    Args:
        logic ( Union[list, dict, str]): The apply logic

    Raises:
        ValueError: If the input logic is invalid

    Returns:
        List: The list of parameters
    """
    if not logic:
        return []

    if isinstance(logic, str):
        return [UpgradesAnalyzer._get_para_option(logic)[0]]
    elif isinstance(logic, list):
        all_params = []
        for el in logic:
            all_params.extend(UpgradesAnalyzer.get_mentioned_parameters(el))
        return list(dict.fromkeys(all_params))  # remove duplicates while maintainig order
    elif isinstance(logic, dict):
        return UpgradesAnalyzer.get_mentioned_parameters(list(logic.values())[0])
    else:
        raise ValueError("Invalid logic type")

Methods

def get_cfg(self) ‑> dict

Get the buildstock configuration file as a dictionary object.

Returns

dict
The buildstock configuration file.
Expand source code
def get_cfg(self) -> dict:
    """Get the buildstock configuration file as a dictionary object.

    Returns:
        dict: The buildstock configuration file.
    """
    with open(self.yaml_file) as f:
        config = yaml.load(f, Loader=yaml.SafeLoader)
    return config
def get_detailed_report(self, upgrade_num: int, option_num: Optional[int] = None, normalize_logic: bool = False) ‑> tuple[numpy.ndarray, str]

Prints detailed report for a particular upgrade (and optionally, an option)

Args

upgrade_num : int
The 1-indexed upgrade for which to print the report.
option_num : int, optional
The 1-indexed option number for which to print report. Defaults to None, which will print report for all options.
normalize_logic : bool, optional
Whether to normalize the logic structure. Defaults to False.

Returns

(np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.

Expand source code
def get_detailed_report(self, upgrade_num: int, option_num: Optional[int] = None,
                        normalize_logic: bool = False) -> tuple[np.ndarray, str]:
    """Prints detailed report for a particular upgrade (and optionally, an option)
    Args:
        upgrade_num (int): The 1-indexed upgrade for which to print the report.
        option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which
                                    will print report for all options.
        normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False.
    Returns:
        (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.
    """
    cfg = self.get_cfg()
    if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1:
        raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.")

    if option_num is None:
        return self._get_detailed_report_all(upgrade_num, normalize_logic=normalize_logic)

    self._logic_cache = {}
    if upgrade_num == 0 or option_num == 0:
        raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}")
    report_str = ""
    try:
        upgrade = cfg["upgrades"][upgrade_num - 1]
        opt = upgrade["options"][option_num - 1]
    except (KeyError, IndexError, TypeError) as e:
        raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e

    ugrade_name = upgrade.get("upgrade_name")
    header = f"Option Apply Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'"
    report_str += "-" * len(header) + "\n"
    report_str += header + "\n"
    report_str += "-" * len(header) + "\n"
    if "apply_logic" in opt:
        logic = UpgradesAnalyzer._normalize_lists(opt["apply_logic"])
        logic = self.parser.normalize_logic(logic) if normalize_logic else logic
        logic_array, logic_str = self._get_logic_report(logic)
        footer_len = len(logic_str[-1])
        report_str += "\n".join(logic_str) + "\n"
        report_str += "-" * footer_len + "\n"
    else:
        logic_array = np.ones((1, self.total_samples), dtype=bool)

    if "package_apply_logic" in upgrade:
        logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"])
        logic = self.parser.normalize_logic(logic) if normalize_logic else logic
        package_logic_array, logic_str = self._get_logic_report(logic)
        footer_len = len(logic_str[-1])
        report_str += "Package Apply Logic Report" + "\n"
        report_str += "--------------------------" + "\n"
        report_str += "\n".join(logic_str) + "\n"
        report_str += "-" * footer_len + "\n"
        logic_array = logic_array & package_logic_array

    count = logic_array.sum()
    footer_str = f"Overall applied to => {count} ({self._to_pct(count)}%)."
    report_str += footer_str + "\n"
    report_str += "-" * len(footer_str) + "\n"
    return logic_array, report_str
def get_left_out_report(self, upgrade_num: int, option_num: Optional[int] = None) ‑> tuple[numpy.ndarray, str]

Prints detailed report for a particular upgrade (and optionally, an option)

Args

upgrade_num : int
The 1-indexed upgrade for which to print the report.
option_num : int, optional
The 1-indexed option number for which to print report. Defaults to None, which will print report for all options.
normalize_logic : bool, optional
Whether to normalize the logic structure. Defaults to False.

Returns

(np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.

Expand source code
def get_left_out_report(self, upgrade_num: int, option_num: Optional[int] = None) -> tuple[np.ndarray, str]:
    """Prints detailed report for a particular upgrade (and optionally, an option)
    Args:
        upgrade_num (int): The 1-indexed upgrade for which to print the report.
        option_num (int, optional): The 1-indexed option number for which to print report. Defaults to None, which
                                    will print report for all options.
        normalize_logic (bool, optional): Whether to normalize the logic structure. Defaults to False.
    Returns:
        (np.ndarray, str): Returns a logic array of buildings to which the any of the option applied and report str.
    """
    cfg = self.get_cfg()
    if upgrade_num <= 0 or upgrade_num > len(cfg["upgrades"]) + 1:
        raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrade num is 1-indexed.")

    if option_num is None:
        return self._get_left_out_report_all(upgrade_num)

    self._logic_cache = {}
    if upgrade_num == 0 or option_num == 0:
        raise ValueError(f"Upgrades and options are 1-indexed.Got {upgrade_num} {option_num}")
    report_str = ""
    try:
        upgrade = cfg["upgrades"][upgrade_num - 1]
        opt = upgrade["options"][option_num - 1]
    except (KeyError, IndexError, TypeError) as e:
        raise ValueError(f"The yaml doesn't have {upgrade_num}/{option_num} upgrade/option") from e

    ugrade_name = upgrade.get("upgrade_name")
    header = f"Left Out Report for - Upgrade{upgrade_num}:'{ugrade_name}', Option{option_num}:'{opt['option']}'"
    report_str += "-" * len(header) + "\n"
    report_str += header + "\n"
    report_str += "-" * len(header) + "\n"
    if "apply_logic" in opt and "package_apply_logic" in upgrade:
        logic = {"not": {"and": [opt["apply_logic"], upgrade["package_apply_logic"]]}}
    elif "apply_logic" in opt:
        logic = {"not": opt["apply_logic"]}
    else:
        logic = {"not": upgrade["package_apply_logic"]}
    logic = self.parser.normalize_logic(logic)

    logic_array, logic_str = self._get_logic_report(logic)
    footer_len = len(logic_str[-1])
    report_str += "\n".join(logic_str) + "\n"
    report_str += "-" * footer_len + "\n"
    count = logic_array.sum()
    footer_str = f"Overall Not Applied to => {count} ({self._to_pct(count)}%)."
    report_str += footer_str + "\n"
    report_str += "-" * len(footer_str) + "\n"
    return logic_array, report_str
def get_report(self, upgrade_num: Optional[int] = None) ‑> pandas.core.frame.DataFrame

Analyses how many buildings various options in all the upgrades is going to apply to and returns a report in DataFrame format.

Args

upgrade_num
Numeric index of upgrade (1-indexed). If None, all upgrades are assessed

Returns

pd.DataFrame
The upgrade and options report.
Expand source code
def get_report(self, upgrade_num: Optional[int] = None) -> pd.DataFrame:
    """Analyses how many buildings various options in all the upgrades is going to apply to and returns
    a report in DataFrame format.
    Args:
        upgrade_num: Numeric index of upgrade (1-indexed). If None, all upgrades are assessed

    Returns:
        pd.DataFrame: The upgrade and options report.

    """

    def _get_records(indx, upgrade):
        records = []
        logger.info(f"Analyzing upgrade {indx + 1}")
        all_applied_bldgs = np.zeros((1, self.total_samples), dtype=bool)
        package_applied_bldgs = np.ones((1, self.total_samples), dtype=bool)
        if "package_apply_logic" in upgrade:
            pkg_flat_logic = UpgradesAnalyzer._normalize_lists(upgrade["package_apply_logic"])
            package_applied_bldgs = self._reduce_logic(pkg_flat_logic, parent=None)

        for opt_index, option in enumerate(upgrade["options"]):
            applied_bldgs = np.ones((1, self.total_samples), dtype=bool)
            if "apply_logic" in option:
                flat_logic = UpgradesAnalyzer._normalize_lists(option["apply_logic"])
                applied_bldgs &= self._reduce_logic(flat_logic, parent=None)
            else:
                applied_bldgs = np.ones((1, self.total_samples), dtype=bool)

            applied_bldgs &= package_applied_bldgs
            count = applied_bldgs.sum()
            all_applied_bldgs |= applied_bldgs
            record = {
                "upgrade": indx + 1,
                "upgrade_name": upgrade["upgrade_name"],
                "option_num": opt_index + 1,
                "option": option["option"],
                "applicable_to": count,
                "applicable_percent": self._to_pct(count),
                "applicable_buildings": set(self.buildstock_df.loc[applied_bldgs[0]].index),
            }
            records.append(record)

        count = all_applied_bldgs.sum()
        record = {
            "upgrade": indx + 1,
            "upgrade_name": upgrade["upgrade_name"],
            "option_num": -1,
            "option": "All",
            "applicable_to": count,
            "applicable_buildings": set(self.buildstock_df.loc[all_applied_bldgs[0]].index),
            "applicable_percent": self._to_pct(count),
        }
        records.append(record)
        return records

    cfg = self.get_cfg()
    self._logic_cache = {}
    if "upgrades" not in cfg:
        raise ValueError("The project yaml has no upgrades defined")

    max_upg = len(cfg["upgrades"]) + 1
    if upgrade_num is not None:
        if upgrade_num <= 0 or upgrade_num > max_upg:
            raise ValueError(f"Invalid upgrade {upgrade_num}. Valid upgrade_num = {list(range(1, max_upg))}.")

    records = []
    for indx, upgrade in enumerate(cfg["upgrades"]):
        if upgrade_num is None or upgrade_num == indx + 1:
            records += _get_records(indx, upgrade)
        else:
            continue

    report_df = pd.DataFrame.from_records(records)
    return report_df
def get_upgraded_buildstock(self, upgrade_num)
Expand source code
def get_upgraded_buildstock(self, upgrade_num):
    report_df = self.get_report(upgrade_num)
    upgrade_name = report_df["upgrade_name"].unique()[0]
    logger.info(f" * Upgraded buildstock for upgrade {upgrade_num} : {upgrade_name}")

    df = self.buildstock_df_original.copy()
    for idx, row in report_df.iterrows():
        if row["option"] == "All":
            continue
        dimension, upgrade_option = row["option"].split("|")
        apply_logic = df["Building"].isin(row["applicable_buildings"])
        # apply upgrade
        df[dimension] = np.where(apply_logic, upgrade_option, df[dimension])

    # report
    cond = report_df["option"] == "All"
    n_total = len(self.buildstock_df_original)
    n_applied = report_df.loc[cond, "applicable_to"].iloc[0]
    n_applied_pct = report_df.loc[cond, "applicable_percent"].iloc[0]
    logger.info(
        f"   Upgrade package has {len(report_df)-1} options and "
        f"was applied to {n_applied} / {n_total} dwelling units ( {n_applied_pct} % )"
    )

    # QC
    n_diff = len(self.buildstock_df_original.compare(df)) - n_applied
    if n_diff > 0:
        raise ValueError(
            f"Relative to baseline buildstock, upgraded buildstock has {n_diff} more rows "
            "of difference than reported."
        )
    elif n_diff < 0:
        logger.warning(
            f"Relative to baseline buildstock, upgraded buildstock has {-1*n_diff} fewer rows "
            "of difference than reported. This is okay, but indicates that some parameters are "
            "being upgraded to the same incumbent option (e.g., LEDs to LEDs). Check that this is intentional."
        )
    else:
        logger.info("No cases of parameter upgraded with incumbent option detected.")

    return df
def print_unique_characteristic(self, upgrade_num: int, name: str, base_bldg_list: list, compare_bldg_list: list)

Finds and prints what's unique among a list of buildings compared to baseline buildings. Useful for debugging why a certain set of buildings' energy consumption went up for an upgrade, for example.

Args

upgrade_num : int
The upgrade for which the analysis is being done.
name : str
Some name to identify the building set (only used for printing)
base_bldg_list : list
The set of 'normal' buildings id to compare against.
compare_bldg_list : list
The set of buildings whose unique characteristics is to be printed.
Expand source code
def print_unique_characteristic(self, upgrade_num: int, name: str, base_bldg_list: list, compare_bldg_list: list):
    """Finds and prints what's unique among a list of buildings compared to baseline buildings.
       Useful for debugging why a certain set of buildings' energy consumption went up for an upgrade, for example.
    Args:
        upgrade_num (int): The upgrade for which the analysis is being done.
        name (str): Some name to identify the building set (only used for printing)
        base_bldg_list (list): The set of 'normal' buildings id to compare against.
        compare_bldg_list (list): The set of buildings whose unique characteristics is to be printed.
    """
    cfg = self.get_cfg()
    if upgrade_num == 0:
        raise ValueError(f"Upgrades are 1-indexed. Got {upgrade_num}")

    try:
        upgrade_cfg = cfg["upgrades"][upgrade_num - 1]
    except KeyError as e:
        raise ValueError(f"Invalid upgrade {upgrade_num}. Upgrades are 1-indexed, FYI.") from e

    parameter_list = []
    for option_cfg in upgrade_cfg["options"]:
        parameter_list.append(UpgradesAnalyzer._get_para_option(option_cfg["option"])[0])
        parameter_list.extend(UpgradesAnalyzer.get_mentioned_parameters(option_cfg.get("apply_logic")))
    res_df = self.buildstock_df
    # remove duplicates (dict.fromkeys) and remove parameters not existing in buildstock_df
    parameter_list = [param for param in dict.fromkeys(parameter_list) if param in res_df.columns]
    compare_df = res_df.loc[compare_bldg_list]
    base_df = res_df.loc[base_bldg_list]
    print(f"Comparing {len(compare_df)} buildings with {len(base_df)} other buildings.")
    unique_vals_dict: dict[tuple[str, ...], set[tuple[str, ...]]] = {}
    for col in res_df.columns:
        no_change_set = set(compare_df[col].fillna("").unique())
        other_set = set(base_df[col].fillna("").unique())
        if only_in_no_change := no_change_set - other_set:
            print(f"Only {name} buildings have {col} in {sorted(only_in_no_change)}")
            unique_vals_dict[(col,)] = {(entry,) for entry in only_in_no_change}

    if not unique_vals_dict:
        print("No 1-column unique chracteristics found.")

    for combi_size in range(2, min(len(parameter_list) + 1, 5)):
        print(f"Checking {combi_size} column combinations out of {parameter_list}")
        found_uniq_chars = 0
        for cols in combinations(parameter_list, combi_size):
            compare_tups = compare_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None)
            other_tups = base_df[list(cols)].fillna("").drop_duplicates().itertuples(index=False, name=None)
            only_in_compare = set(compare_tups) - set(other_tups)

            # remove cases arisen out of uniqueness found earlier with smaller susbset of cols
            for sub_combi_size in range(1, len(cols)):
                for sub_cols in combinations(cols, sub_combi_size):
                    if sub_cols in unique_vals_dict:
                        new_set = set()
                        for val in only_in_compare:
                            relevant_val = tuple(val[cols.index(sub_col)] for sub_col in sub_cols)
                            if relevant_val not in unique_vals_dict[sub_cols]:
                                new_set.add(val)
                        only_in_compare = new_set

            if only_in_compare:
                print(f"Only {name} buildings have {cols} in {sorted(only_in_compare)} \n")
                found_uniq_chars += 1
                unique_vals_dict[cols] = only_in_compare

        if not found_uniq_chars:
            print(f"No {combi_size}-column unique chracteristics found.")
def save_detailed_report_all(self, file_path: str, logic_transform=None)

Save detailed text based upgrade report.

Args

file_path : str
Output file.
Expand source code
def save_detailed_report_all(self, file_path: str, logic_transform=None):
    """Save detailed text based upgrade report.

    Args:
        file_path (str): Output file.
    """
    cfg = self.get_cfg()
    all_report = ""
    for upgrade in range(1, len(cfg["upgrades"]) + 1):
        logger.info(f"Getting report for upgrade {upgrade}")
        _, report = self.get_detailed_report(upgrade, normalize_logic=logic_transform)
        all_report += report + "\n"
    with open(file_path, "w") as file:
        file.write(all_report)