Module buildstock_query.utility_query
Expand source code
import buildstock_query.main as main
import logging
from typing import List, Any, Tuple, Optional, Union, Sequence, Literal
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.sql import functions as safunc
from collections import defaultdict
from buildstock_query.schema.query_params import UtilityTSQuery, TSQuery
from buildstock_query.schema.helpers import gather_params
from buildstock_query.schema.utilities import AnyColType, AnyTableType, MappedColumn
from buildstock_query.helpers import read_csv
from pydantic import Field, BaseModel, ValidationError, validate_arguments
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TimeTuple(BaseModel):
# has to be between 1 and 12
month: int = Field(..., ge=1, le=12)
is_weekend: int = Field(..., ge=0, le=1)
hour: int = Field(..., ge=0, le=23)
def __hash__(self):
return hash((self.month, self.is_weekend, self.hour))
class TOURate(BaseModel):
data: dict[TimeTuple, float] = Field(..., example={TimeTuple(month=1, is_weekend=0, hour=3): 0.5})
raw_dict: dict[Tuple[int, int, int], float] = Field(..., example={(1, 0, 3): 0.5})
def __init__(self, rate_dict: dict[Tuple[int, int, int], float]):
data: dict[TimeTuple, float] = {}
for key, value in rate_dict.items():
try:
data[TimeTuple(month=key[0], is_weekend=key[1], hour=key[2])] = value
except ValidationError as e:
raise ValueError(f"Invalid key {key} in rate_dict. Make sure the keys are"
" (month (1 to 12), is_weekend (0 or 1), hour_of_day (0 to 23)") from e
super().__init__(data=data, raw_dict=rate_dict)
class BuildStockUtility:
"""
Class to perform electric utility centric queries.
"""
def __init__(self, buildstock_query: 'main.BuildStockQuery',
eia_mapping_year: int = 2018, eia_mapping_version: int = 1):
"""
Class to perform electric utility centric queries
Args:
eia_mapping_year: The year of the EIA form 861 service territory map to use when mapping to utility \
service territories. Currently, only 2018 and 2012 are valid years.
eia_mapping_version: The EIA mapping version to use.
"""
self._bsq = buildstock_query
self._agg = buildstock_query.agg
self._group_query_id = 0
self.eia_mapping_year = eia_mapping_year
self.eia_mapping_version = eia_mapping_version
self._cache: dict = defaultdict()
def _aggregate_ts_by_map(self,
map_table_name: str,
baseline_column_name: str,
map_column_name: str,
id_column: str,
id_list: Sequence[Any],
params: UtilityTSQuery):
new_table = self._bsq._get_table(map_table_name)
new_column = self._bsq._get_column(map_column_name, table_name=new_table)
baseline_column = self._bsq._get_column(baseline_column_name, self._bsq.bs_table)
params.group_by = [new_table.c[id_column]] + list(params.group_by)
params.weights = list(params.weights) + [new_table.c['weight']]
params.join_list = [(new_table, baseline_column, new_column)] + list(params.join_list)
logger.info(f"Will submit request for {id_list}")
GS = params.query_group_size
id_list_batches = [id_list[i:i + GS] for i in range(0, len(id_list), GS)]
results_array = []
for current_ids in id_list_batches:
if len(current_ids) == 1:
current_ids = current_ids[0]
new_params = TSQuery(enduses=params.enduses,
group_by=params.group_by,
upgrade_id=params.upgrade_id,
sort=params.sort,
join_list=params.join_list,
weights=params.weights,
restrict=[(new_table.c[id_column], current_ids)] + list(params.restrict),
collapse_ts=params.collapse_ts,
timestamp_grouping_func=params.timestamp_grouping_func,
limit=params.limit,
split_enduses=params.split_enduses,
get_quartiles=params.get_quartiles,
get_query_only=False if params.split_enduses else True,
)
logger.info(f"Submitting query for {current_ids}")
result = self._agg.aggregate_timeseries(params=new_params)
results_array.append(result)
if params.get_query_only:
return results_array
if params.split_enduses:
# In this case, the resuls_array will contain the result dataframes
logger.info("Concatenating the results from all IDs")
all_dfs = pd.concat(results_array)
return all_dfs
else:
# In this case, results_array will contain the queries
batch_query_id = self._bsq.submit_batch_query(results_array)
return self._bsq.get_batch_query_result(batch_id=batch_query_id)
def get_eiaid_map(self) -> tuple[str, str, str]:
if self.eia_mapping_version == 1:
map_table_name = 'eiaid_weights'
map_baseline_column = f'{self._bsq._char_prefix}county'
map_eiaid_column = 'county'
elif self.eia_mapping_version == 2:
map_table_name = 'v2_eiaid_weights'
map_baseline_column = f'{self._bsq._char_prefix}county'
map_eiaid_column = 'county'
elif self.eia_mapping_version == 3:
map_table_name = 'v3_eiaid_weights_%d' % (self.eia_mapping_year)
map_baseline_column = f'{self._bsq._char_prefix}county'
map_eiaid_column = 'county'
else:
raise ValueError("Invalid mapping_version")
return map_table_name, map_baseline_column, map_eiaid_column
@gather_params(UtilityTSQuery)
def aggregate_ts_by_eiaid(self, params: UtilityTSQuery):
"""
Aggregates the timeseries result, grouping by utilities.
Args:
eiaid_list: The list of utility ids (EIAID) assigned by EIA.
enduses: The list of enduses to aggregate
group_by: Additional columns to group the aggregation by
mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two
should be used
get_query_only: If set to true, returns the list of queries to run instead of the result.
query_group_size: The number of eiaids to be grouped together when running athena queries. This should be
used as large as possible that doesn't result in query timeout.
split_endues: Query each enduses separately to spread load on Athena
Returns:
Pandas dataframe with the aggregated timeseries and the requested enduses grouped by utilities
"""
eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map()
if not params.enduses:
raise ValueError("Need to provide enduses")
id_column = 'eiaid'
if params.query_group_size is None:
params.query_group_size = min(100, len(params.eiaid_list))
return self._aggregate_ts_by_map(map_table_name=eiaid_map_table_name,
baseline_column_name=map_baseline_column,
map_column_name=map_eiaid_column,
id_column=id_column,
id_list=params.eiaid_list,
params=params)
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def aggregate_unit_counts_by_eiaid(self, *, eiaid_list: list[str],
group_by: list[Union[AnyColType, tuple[str, str]]] = [],
get_query_only: bool = False):
"""
Returns the counts of the number of dwelling units, grouping by eiaid and other additional group_by columns if
provided.
Args:
eiaid_list: The list of utility ids (EIAID) to aggregate for
group_by: Additional columns to group by
mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two
should be used
get_query_only: If set to true, returns the query instead of the result
Returns:
Pandas dataframe with the units counts
"""
logger.info("Aggregating unit counts by eiaid")
group_by = group_by or []
eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map()
group_by = [] if group_by is None else group_by
restrict = [('eiaid', eiaid_list)]
eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name)
result = self._agg.aggregate_annual(enduses=[], group_by=[eiaid_col] + group_by,
sort=True,
join_list=[(eiaid_map_table_name, map_baseline_column, map_eiaid_column)],
weights=['weight'],
restrict=restrict,
get_query_only=get_query_only)
return result
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def aggregate_annual_by_eiaid(self, enduses: Sequence[AnyColType], group_by: Optional[List[str]] = None,
get_query_only: bool = False, get_nonzero_count: bool = False):
"""
Aggregates the annual consumption in the baseline table, grouping by all the utilities
Args:
enduses: The list of enduses to aggregate
group_by: Additional columns to group the aggregation by
mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two
should be used
get_query_only: If set to true, returns the list of queries to run instead of the result.
Returns:
Pandas dataframe with the annual sum of the requested enduses, grouped by utilities
"""
eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map()
join_list = [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)]
group_by = [] if group_by is None else group_by
group_by_cols = [self._bsq._get_column(col, self._bsq.bs_table) for col in group_by]
eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name)
result = self._agg.aggregate_annual(enduses=enduses, group_by=[eiaid_col] + group_by_cols,
join_list=join_list,
weights=['weight'],
sort=True,
get_query_only=get_query_only,
get_nonzero_count=get_nonzero_count)
return result
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_filtered_results_csv_by_eiaid(
self, eiaids: List[str], get_query_only: bool = False):
"""
Returns a portion of the results csvs, which belongs to given list of utilities
Args:
eiaids: The eiaid list of utitlies
mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two
should be used
get_query_only: If set to true, returns the list of queries to run instead of the result.
Returns:
Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities
"""
eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map()
query = sa.select(['*']).select_from(self._bsq.bs_table)
query = self._bsq._add_join(query, [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)])
query = self._bsq._add_restrict(query, [("eiaid", eiaids)])
query = query.where(self._bsq._get_column("weight") > 0)
if get_query_only:
return self._bsq._compile(query)
res = self._bsq.execute(query)
return res
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_eiaids(self, restrict: Optional[List[Tuple[str, List]]] = None) -> list[str]:
"""
Returns the list of eiaids
Args:
restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple.
Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]`
mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two
should be used
Returns:
Pandas dataframe consisting of the eiaids belonging to the provided list of locations.
"""
restrict = list(restrict) if restrict else []
eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map()
eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name)
if 'eiaids' in self._cache:
if self._bsq.db_name + '/' + eiaid_map_table_name in self._cache['eiaids']:
return self._cache['eiaids'][self._bsq.db_name + '/' + eiaid_map_table_name]
else:
self._cache['eiaids'] = {}
join_list = [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)]
annual_agg = self._agg.aggregate_annual(enduses=[], group_by=[eiaid_col],
restrict=restrict,
join_list=join_list,
weights=['weight'],
sort=True)
self._cache['eiaids'] = list(annual_agg['eiaid'].to_numpy(dtype=str).tolist())
return self._cache['eiaids']
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_buildings_by_eiaids(self, eiaids: List[str], get_query_only: bool = False):
"""
Returns the list of buildings belonging to the given list of utilities.
Args:
eiaids: list of utility EIAIDs
mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two
should be used
get_query_only: If set to true, returns the query string instead of the result
Returns:
Pandas dataframe consisting of the building ids belonging to the provided list of utilities.
"""
eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map()
query = sa.select([self._bsq.bs_bldgid_column.distinct()])
query = self._bsq._add_join(query, [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)])
query = self._bsq._add_restrict(query, [("eiaid", eiaids)])
query = query.where(self._bsq._get_column("weight") > 0)
if get_query_only:
return self._bsq._compile(query)
res = self._bsq.execute(query)
return res
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def get_locations_by_eiaids(self, eiaids: List[str], get_query_only: bool = False):
"""
Returns the list of locations/counties (depends on mapping version) belonging to a given list of utilities.
Args:
eiaids: list of utility EIAIDs
mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two
should be used
get_query_only: If set to true, returns the query string instead of the result
Returns:
Pandas dataframe consisting of the locations (for version 1) or counties (for version 2) belonging to the
provided list of utilities.
"""
eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map()
eiaid_map_table = self._bsq._get_table(eiaid_map_table_name)
query = sa.select([eiaid_map_table.c[map_eiaid_column].distinct()])
query = self._bsq._add_restrict(query, [("eiaid", eiaids)])
query = query.where(eiaid_map_table.c["weight"] > 0)
if get_query_only:
return self._bsq._compile(query)
res = self._bsq.execute(query)
return list(res[map_eiaid_column].values)
def get_rate_map(self, weekend_csv_path: str, weekday_csv_path: str) -> dict[tuple[int, int, int], float]:
def read_rate_file(file_path: str) -> pd.DataFrame:
df = read_csv(file_path)
if len(df) != 12:
raise ValueError(f"Invalid number of rows in {file_path}. Expected 12, got {len(df)}")
if len(df.columns) != 25:
raise ValueError(f"Invalid number of columns in {file_path}. Expected 25, got {len(df.columns)}")
if 'month' != df.columns[0]:
raise ValueError(f"Invalid column names in {file_path}. Expected first column to be 'month'")
df = df.set_index('month')
df.index = pd.Index(range(1, 13), name='month')
df.columns = pd.Index(range(0, 24), name="Hour")
return df
weekday_rate = read_rate_file(weekday_csv_path)
weekend_rate = read_rate_file(weekend_csv_path)
weekday_rate['weekend'] = 0
weekend_rate['weekend'] = 1
full_rate = pd.concat([weekday_rate, weekend_rate])
full_rate = full_rate.reset_index().melt(id_vars=['month', 'weekend'],
value_vars=range(0, 24), var_name='hour', value_name='rate')
rate_map = full_rate.set_index(['month', 'weekend', 'hour'])['rate'].to_dict()
return rate_map
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def calculate_tou_bill(self, *,
rate_map: Union[tuple[str, str], dict[tuple[int, int, int], float]],
meter_col: Optional[Union[AnyColType, tuple[AnyColType, ...]]] = None,
group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list),
upgrade_id: Union[int, str] = '0',
sort: bool = True,
join_list: Sequence[tuple[AnyTableType, AnyColType,
AnyColType]] = Field(default_factory=list),
weights: Sequence[Union[str, tuple]] = Field(default_factory=list),
restrict: Sequence[
tuple[AnyColType,
Union[str, int, Sequence[Union[int, str]]]]] = Field(default_factory=list),
collapse_ts: bool = False,
timestamp_grouping_func: Optional[Literal["month", "day", "hour"]] = "month",
limit: Optional[int] = None,
get_query_only: bool = False
):
if isinstance(rate_map, tuple):
rate_map = self.get_rate_map(*rate_map)
user_rate = TOURate(rate_map)
if self._bsq.ts_table is None:
raise ValueError("No timeseries table found")
TOU_enduse = {}
if meter_col is None:
TOU_enduse["fuel_use__electricity__net__kwh__TOU"] =\
self._bsq.ts_table.c['fuel_use__electricity__total__kwh'] +\
safunc.coalesce(self._bsq.ts_table.c['end_use__electricity__pv__kwh'], 0)
else:
if isinstance(meter_col, tuple):
for col in meter_col:
TOU_enduse[f"{col}__TOU"] = self._bsq._get_column(col)
else:
TOU_enduse[f"{meter_col}__TOU"] = self._bsq._get_column(meter_col)
month_col, is_weekend_col, hour_col = (self._bsq._get_special_column(col) for col in
("month", "is_weekend", "hour"))
rate_col = MappedColumn(bsq=self._bsq, name="tou_rate", mapping_dict=user_rate.raw_dict,
key=(month_col, is_weekend_col, hour_col))
enduses_list = []
for col in TOU_enduse:
enduses_list.append((TOU_enduse[col] * rate_col / 100).label(f"{col}__dollars"))
ts_query = TSQuery(enduses=enduses_list,
group_by=group_by,
upgrade_id=str(upgrade_id),
sort=sort,
join_list=join_list,
weights=weights,
restrict=restrict,
collapse_ts=collapse_ts,
timestamp_grouping_func=timestamp_grouping_func,
limit=limit,
get_query_only=get_query_only
)
return self._agg.aggregate_timeseries(params=ts_query)
Classes
class BuildStockUtility (buildstock_query: main.BuildStockQuery, eia_mapping_year: int = 2018, eia_mapping_version: int = 1)
-
Class to perform electric utility centric queries.
Class to perform electric utility centric queries
Args
eia_mapping_year
- The year of the EIA form 861 service territory map to use when mapping to utility service territories. Currently, only 2018 and 2012 are valid years.
eia_mapping_version
- The EIA mapping version to use.
Expand source code
class BuildStockUtility: """ Class to perform electric utility centric queries. """ def __init__(self, buildstock_query: 'main.BuildStockQuery', eia_mapping_year: int = 2018, eia_mapping_version: int = 1): """ Class to perform electric utility centric queries Args: eia_mapping_year: The year of the EIA form 861 service territory map to use when mapping to utility \ service territories. Currently, only 2018 and 2012 are valid years. eia_mapping_version: The EIA mapping version to use. """ self._bsq = buildstock_query self._agg = buildstock_query.agg self._group_query_id = 0 self.eia_mapping_year = eia_mapping_year self.eia_mapping_version = eia_mapping_version self._cache: dict = defaultdict() def _aggregate_ts_by_map(self, map_table_name: str, baseline_column_name: str, map_column_name: str, id_column: str, id_list: Sequence[Any], params: UtilityTSQuery): new_table = self._bsq._get_table(map_table_name) new_column = self._bsq._get_column(map_column_name, table_name=new_table) baseline_column = self._bsq._get_column(baseline_column_name, self._bsq.bs_table) params.group_by = [new_table.c[id_column]] + list(params.group_by) params.weights = list(params.weights) + [new_table.c['weight']] params.join_list = [(new_table, baseline_column, new_column)] + list(params.join_list) logger.info(f"Will submit request for {id_list}") GS = params.query_group_size id_list_batches = [id_list[i:i + GS] for i in range(0, len(id_list), GS)] results_array = [] for current_ids in id_list_batches: if len(current_ids) == 1: current_ids = current_ids[0] new_params = TSQuery(enduses=params.enduses, group_by=params.group_by, upgrade_id=params.upgrade_id, sort=params.sort, join_list=params.join_list, weights=params.weights, restrict=[(new_table.c[id_column], current_ids)] + list(params.restrict), collapse_ts=params.collapse_ts, timestamp_grouping_func=params.timestamp_grouping_func, limit=params.limit, split_enduses=params.split_enduses, get_quartiles=params.get_quartiles, get_query_only=False if params.split_enduses else True, ) logger.info(f"Submitting query for {current_ids}") result = self._agg.aggregate_timeseries(params=new_params) results_array.append(result) if params.get_query_only: return results_array if params.split_enduses: # In this case, the resuls_array will contain the result dataframes logger.info("Concatenating the results from all IDs") all_dfs = pd.concat(results_array) return all_dfs else: # In this case, results_array will contain the queries batch_query_id = self._bsq.submit_batch_query(results_array) return self._bsq.get_batch_query_result(batch_id=batch_query_id) def get_eiaid_map(self) -> tuple[str, str, str]: if self.eia_mapping_version == 1: map_table_name = 'eiaid_weights' map_baseline_column = f'{self._bsq._char_prefix}county' map_eiaid_column = 'county' elif self.eia_mapping_version == 2: map_table_name = 'v2_eiaid_weights' map_baseline_column = f'{self._bsq._char_prefix}county' map_eiaid_column = 'county' elif self.eia_mapping_version == 3: map_table_name = 'v3_eiaid_weights_%d' % (self.eia_mapping_year) map_baseline_column = f'{self._bsq._char_prefix}county' map_eiaid_column = 'county' else: raise ValueError("Invalid mapping_version") return map_table_name, map_baseline_column, map_eiaid_column @gather_params(UtilityTSQuery) def aggregate_ts_by_eiaid(self, params: UtilityTSQuery): """ Aggregates the timeseries result, grouping by utilities. Args: eiaid_list: The list of utility ids (EIAID) assigned by EIA. enduses: The list of enduses to aggregate group_by: Additional columns to group the aggregation by mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the list of queries to run instead of the result. query_group_size: The number of eiaids to be grouped together when running athena queries. This should be used as large as possible that doesn't result in query timeout. split_endues: Query each enduses separately to spread load on Athena Returns: Pandas dataframe with the aggregated timeseries and the requested enduses grouped by utilities """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() if not params.enduses: raise ValueError("Need to provide enduses") id_column = 'eiaid' if params.query_group_size is None: params.query_group_size = min(100, len(params.eiaid_list)) return self._aggregate_ts_by_map(map_table_name=eiaid_map_table_name, baseline_column_name=map_baseline_column, map_column_name=map_eiaid_column, id_column=id_column, id_list=params.eiaid_list, params=params) @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def aggregate_unit_counts_by_eiaid(self, *, eiaid_list: list[str], group_by: list[Union[AnyColType, tuple[str, str]]] = [], get_query_only: bool = False): """ Returns the counts of the number of dwelling units, grouping by eiaid and other additional group_by columns if provided. Args: eiaid_list: The list of utility ids (EIAID) to aggregate for group_by: Additional columns to group by mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the query instead of the result Returns: Pandas dataframe with the units counts """ logger.info("Aggregating unit counts by eiaid") group_by = group_by or [] eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() group_by = [] if group_by is None else group_by restrict = [('eiaid', eiaid_list)] eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name) result = self._agg.aggregate_annual(enduses=[], group_by=[eiaid_col] + group_by, sort=True, join_list=[(eiaid_map_table_name, map_baseline_column, map_eiaid_column)], weights=['weight'], restrict=restrict, get_query_only=get_query_only) return result @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def aggregate_annual_by_eiaid(self, enduses: Sequence[AnyColType], group_by: Optional[List[str]] = None, get_query_only: bool = False, get_nonzero_count: bool = False): """ Aggregates the annual consumption in the baseline table, grouping by all the utilities Args: enduses: The list of enduses to aggregate group_by: Additional columns to group the aggregation by mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the list of queries to run instead of the result. Returns: Pandas dataframe with the annual sum of the requested enduses, grouped by utilities """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() join_list = [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)] group_by = [] if group_by is None else group_by group_by_cols = [self._bsq._get_column(col, self._bsq.bs_table) for col in group_by] eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name) result = self._agg.aggregate_annual(enduses=enduses, group_by=[eiaid_col] + group_by_cols, join_list=join_list, weights=['weight'], sort=True, get_query_only=get_query_only, get_nonzero_count=get_nonzero_count) return result @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_filtered_results_csv_by_eiaid( self, eiaids: List[str], get_query_only: bool = False): """ Returns a portion of the results csvs, which belongs to given list of utilities Args: eiaids: The eiaid list of utitlies mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the list of queries to run instead of the result. Returns: Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() query = sa.select(['*']).select_from(self._bsq.bs_table) query = self._bsq._add_join(query, [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)]) query = self._bsq._add_restrict(query, [("eiaid", eiaids)]) query = query.where(self._bsq._get_column("weight") > 0) if get_query_only: return self._bsq._compile(query) res = self._bsq.execute(query) return res @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_eiaids(self, restrict: Optional[List[Tuple[str, List]]] = None) -> list[str]: """ Returns the list of eiaids Args: restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple. Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]` mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used Returns: Pandas dataframe consisting of the eiaids belonging to the provided list of locations. """ restrict = list(restrict) if restrict else [] eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name) if 'eiaids' in self._cache: if self._bsq.db_name + '/' + eiaid_map_table_name in self._cache['eiaids']: return self._cache['eiaids'][self._bsq.db_name + '/' + eiaid_map_table_name] else: self._cache['eiaids'] = {} join_list = [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)] annual_agg = self._agg.aggregate_annual(enduses=[], group_by=[eiaid_col], restrict=restrict, join_list=join_list, weights=['weight'], sort=True) self._cache['eiaids'] = list(annual_agg['eiaid'].to_numpy(dtype=str).tolist()) return self._cache['eiaids'] @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_buildings_by_eiaids(self, eiaids: List[str], get_query_only: bool = False): """ Returns the list of buildings belonging to the given list of utilities. Args: eiaids: list of utility EIAIDs mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the query string instead of the result Returns: Pandas dataframe consisting of the building ids belonging to the provided list of utilities. """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() query = sa.select([self._bsq.bs_bldgid_column.distinct()]) query = self._bsq._add_join(query, [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)]) query = self._bsq._add_restrict(query, [("eiaid", eiaids)]) query = query.where(self._bsq._get_column("weight") > 0) if get_query_only: return self._bsq._compile(query) res = self._bsq.execute(query) return res @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_locations_by_eiaids(self, eiaids: List[str], get_query_only: bool = False): """ Returns the list of locations/counties (depends on mapping version) belonging to a given list of utilities. Args: eiaids: list of utility EIAIDs mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the query string instead of the result Returns: Pandas dataframe consisting of the locations (for version 1) or counties (for version 2) belonging to the provided list of utilities. """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() eiaid_map_table = self._bsq._get_table(eiaid_map_table_name) query = sa.select([eiaid_map_table.c[map_eiaid_column].distinct()]) query = self._bsq._add_restrict(query, [("eiaid", eiaids)]) query = query.where(eiaid_map_table.c["weight"] > 0) if get_query_only: return self._bsq._compile(query) res = self._bsq.execute(query) return list(res[map_eiaid_column].values) def get_rate_map(self, weekend_csv_path: str, weekday_csv_path: str) -> dict[tuple[int, int, int], float]: def read_rate_file(file_path: str) -> pd.DataFrame: df = read_csv(file_path) if len(df) != 12: raise ValueError(f"Invalid number of rows in {file_path}. Expected 12, got {len(df)}") if len(df.columns) != 25: raise ValueError(f"Invalid number of columns in {file_path}. Expected 25, got {len(df.columns)}") if 'month' != df.columns[0]: raise ValueError(f"Invalid column names in {file_path}. Expected first column to be 'month'") df = df.set_index('month') df.index = pd.Index(range(1, 13), name='month') df.columns = pd.Index(range(0, 24), name="Hour") return df weekday_rate = read_rate_file(weekday_csv_path) weekend_rate = read_rate_file(weekend_csv_path) weekday_rate['weekend'] = 0 weekend_rate['weekend'] = 1 full_rate = pd.concat([weekday_rate, weekend_rate]) full_rate = full_rate.reset_index().melt(id_vars=['month', 'weekend'], value_vars=range(0, 24), var_name='hour', value_name='rate') rate_map = full_rate.set_index(['month', 'weekend', 'hour'])['rate'].to_dict() return rate_map @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def calculate_tou_bill(self, *, rate_map: Union[tuple[str, str], dict[tuple[int, int, int], float]], meter_col: Optional[Union[AnyColType, tuple[AnyColType, ...]]] = None, group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list), upgrade_id: Union[int, str] = '0', sort: bool = True, join_list: Sequence[tuple[AnyTableType, AnyColType, AnyColType]] = Field(default_factory=list), weights: Sequence[Union[str, tuple]] = Field(default_factory=list), restrict: Sequence[ tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(default_factory=list), collapse_ts: bool = False, timestamp_grouping_func: Optional[Literal["month", "day", "hour"]] = "month", limit: Optional[int] = None, get_query_only: bool = False ): if isinstance(rate_map, tuple): rate_map = self.get_rate_map(*rate_map) user_rate = TOURate(rate_map) if self._bsq.ts_table is None: raise ValueError("No timeseries table found") TOU_enduse = {} if meter_col is None: TOU_enduse["fuel_use__electricity__net__kwh__TOU"] =\ self._bsq.ts_table.c['fuel_use__electricity__total__kwh'] +\ safunc.coalesce(self._bsq.ts_table.c['end_use__electricity__pv__kwh'], 0) else: if isinstance(meter_col, tuple): for col in meter_col: TOU_enduse[f"{col}__TOU"] = self._bsq._get_column(col) else: TOU_enduse[f"{meter_col}__TOU"] = self._bsq._get_column(meter_col) month_col, is_weekend_col, hour_col = (self._bsq._get_special_column(col) for col in ("month", "is_weekend", "hour")) rate_col = MappedColumn(bsq=self._bsq, name="tou_rate", mapping_dict=user_rate.raw_dict, key=(month_col, is_weekend_col, hour_col)) enduses_list = [] for col in TOU_enduse: enduses_list.append((TOU_enduse[col] * rate_col / 100).label(f"{col}__dollars")) ts_query = TSQuery(enduses=enduses_list, group_by=group_by, upgrade_id=str(upgrade_id), sort=sort, join_list=join_list, weights=weights, restrict=restrict, collapse_ts=collapse_ts, timestamp_grouping_func=timestamp_grouping_func, limit=limit, get_query_only=get_query_only ) return self._agg.aggregate_timeseries(params=ts_query)
Methods
def aggregate_annual_by_eiaid(self, enduses: Sequence[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn]], group_by: Optional[List[str]] = None, get_query_only: bool = False, get_nonzero_count: bool = False)
-
Aggregates the annual consumption in the baseline table, grouping by all the utilities
Args
enduses
- The list of enduses to aggregate
group_by
- Additional columns to group the aggregation by
mapping_version
- Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used
get_query_only
- If set to true, returns the list of queries to run instead of the result.
Returns
Pandas dataframe with the annual sum of the requested enduses, grouped by utilities
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def aggregate_annual_by_eiaid(self, enduses: Sequence[AnyColType], group_by: Optional[List[str]] = None, get_query_only: bool = False, get_nonzero_count: bool = False): """ Aggregates the annual consumption in the baseline table, grouping by all the utilities Args: enduses: The list of enduses to aggregate group_by: Additional columns to group the aggregation by mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the list of queries to run instead of the result. Returns: Pandas dataframe with the annual sum of the requested enduses, grouped by utilities """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() join_list = [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)] group_by = [] if group_by is None else group_by group_by_cols = [self._bsq._get_column(col, self._bsq.bs_table) for col in group_by] eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name) result = self._agg.aggregate_annual(enduses=enduses, group_by=[eiaid_col] + group_by_cols, join_list=join_list, weights=['weight'], sort=True, get_query_only=get_query_only, get_nonzero_count=get_nonzero_count) return result
def aggregate_ts_by_eiaid(self, params: UtilityTSQuery)
-
Aggregates the timeseries result, grouping by utilities.
Args
eiaid_list
- The list of utility ids (EIAID) assigned by EIA.
enduses
- The list of enduses to aggregate
group_by
- Additional columns to group the aggregation by
mapping_version
- Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used
get_query_only
- If set to true, returns the list of queries to run instead of the result.
query_group_size
- The number of eiaids to be grouped together when running athena queries. This should be used as large as possible that doesn't result in query timeout.
split_endues
- Query each enduses separately to spread load on Athena
Returns
Pandas dataframe with the aggregated timeseries and the requested enduses grouped by utilities
Expand source code
@gather_params(UtilityTSQuery) def aggregate_ts_by_eiaid(self, params: UtilityTSQuery): """ Aggregates the timeseries result, grouping by utilities. Args: eiaid_list: The list of utility ids (EIAID) assigned by EIA. enduses: The list of enduses to aggregate group_by: Additional columns to group the aggregation by mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the list of queries to run instead of the result. query_group_size: The number of eiaids to be grouped together when running athena queries. This should be used as large as possible that doesn't result in query timeout. split_endues: Query each enduses separately to spread load on Athena Returns: Pandas dataframe with the aggregated timeseries and the requested enduses grouped by utilities """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() if not params.enduses: raise ValueError("Need to provide enduses") id_column = 'eiaid' if params.query_group_size is None: params.query_group_size = min(100, len(params.eiaid_list)) return self._aggregate_ts_by_map(map_table_name=eiaid_map_table_name, baseline_column_name=map_baseline_column, map_column_name=map_eiaid_column, id_column=id_column, id_list=params.eiaid_list, params=params)
def aggregate_unit_counts_by_eiaid(self, *, eiaid_list: list[str], group_by: list[typing.Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn, tuple[str, str]]] = [], get_query_only: bool = False)
-
Returns the counts of the number of dwelling units, grouping by eiaid and other additional group_by columns if provided.
Args
eiaid_list
- The list of utility ids (EIAID) to aggregate for
group_by
- Additional columns to group by
mapping_version
- Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used
get_query_only
- If set to true, returns the query instead of the result
Returns
Pandas dataframe with the units counts
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def aggregate_unit_counts_by_eiaid(self, *, eiaid_list: list[str], group_by: list[Union[AnyColType, tuple[str, str]]] = [], get_query_only: bool = False): """ Returns the counts of the number of dwelling units, grouping by eiaid and other additional group_by columns if provided. Args: eiaid_list: The list of utility ids (EIAID) to aggregate for group_by: Additional columns to group by mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the query instead of the result Returns: Pandas dataframe with the units counts """ logger.info("Aggregating unit counts by eiaid") group_by = group_by or [] eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() group_by = [] if group_by is None else group_by restrict = [('eiaid', eiaid_list)] eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name) result = self._agg.aggregate_annual(enduses=[], group_by=[eiaid_col] + group_by, sort=True, join_list=[(eiaid_map_table_name, map_baseline_column, map_eiaid_column)], weights=['weight'], restrict=restrict, get_query_only=get_query_only) return result
def calculate_tou_bill(self, *, rate_map: Union[tuple[str, str], dict[tuple[int, int, int], float]], meter_col: Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn, tuple[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn], ...], ForwardRef(None)] = None, group_by: Sequence[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn, tuple[str, str]]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), upgrade_id: Union[str, int] = '0', sort: bool = True, join_list: Sequence[tuple[Union[sqlalchemy.sql.schema.Table, str, sqlalchemy.sql.selectable.Subquery], Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn], Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn]]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), weights: Sequence[Union[str, tuple]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), restrict: Sequence[tuple[Union[sqlalchemy.sql.elements.Label, sqlalchemy.sql.schema.Column, str, MappedColumn], Union[str, int, Sequence[Union[int, str]]]]] = FieldInfo(default=PydanticUndefined, default_factory=<class 'list'>, extra={}), collapse_ts: bool = False, timestamp_grouping_func: Optional[Literal['month', 'day', 'hour']] = 'month', limit: Optional[int] = None, get_query_only: bool = False)
-
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def calculate_tou_bill(self, *, rate_map: Union[tuple[str, str], dict[tuple[int, int, int], float]], meter_col: Optional[Union[AnyColType, tuple[AnyColType, ...]]] = None, group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list), upgrade_id: Union[int, str] = '0', sort: bool = True, join_list: Sequence[tuple[AnyTableType, AnyColType, AnyColType]] = Field(default_factory=list), weights: Sequence[Union[str, tuple]] = Field(default_factory=list), restrict: Sequence[ tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(default_factory=list), collapse_ts: bool = False, timestamp_grouping_func: Optional[Literal["month", "day", "hour"]] = "month", limit: Optional[int] = None, get_query_only: bool = False ): if isinstance(rate_map, tuple): rate_map = self.get_rate_map(*rate_map) user_rate = TOURate(rate_map) if self._bsq.ts_table is None: raise ValueError("No timeseries table found") TOU_enduse = {} if meter_col is None: TOU_enduse["fuel_use__electricity__net__kwh__TOU"] =\ self._bsq.ts_table.c['fuel_use__electricity__total__kwh'] +\ safunc.coalesce(self._bsq.ts_table.c['end_use__electricity__pv__kwh'], 0) else: if isinstance(meter_col, tuple): for col in meter_col: TOU_enduse[f"{col}__TOU"] = self._bsq._get_column(col) else: TOU_enduse[f"{meter_col}__TOU"] = self._bsq._get_column(meter_col) month_col, is_weekend_col, hour_col = (self._bsq._get_special_column(col) for col in ("month", "is_weekend", "hour")) rate_col = MappedColumn(bsq=self._bsq, name="tou_rate", mapping_dict=user_rate.raw_dict, key=(month_col, is_weekend_col, hour_col)) enduses_list = [] for col in TOU_enduse: enduses_list.append((TOU_enduse[col] * rate_col / 100).label(f"{col}__dollars")) ts_query = TSQuery(enduses=enduses_list, group_by=group_by, upgrade_id=str(upgrade_id), sort=sort, join_list=join_list, weights=weights, restrict=restrict, collapse_ts=collapse_ts, timestamp_grouping_func=timestamp_grouping_func, limit=limit, get_query_only=get_query_only ) return self._agg.aggregate_timeseries(params=ts_query)
def get_buildings_by_eiaids(self, eiaids: List[str], get_query_only: bool = False)
-
Returns the list of buildings belonging to the given list of utilities.
Args
eiaids
- list of utility EIAIDs
mapping_version
- Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used
get_query_only
- If set to true, returns the query string instead of the result
Returns
Pandas dataframe consisting of the building ids belonging to the provided list of utilities.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_buildings_by_eiaids(self, eiaids: List[str], get_query_only: bool = False): """ Returns the list of buildings belonging to the given list of utilities. Args: eiaids: list of utility EIAIDs mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the query string instead of the result Returns: Pandas dataframe consisting of the building ids belonging to the provided list of utilities. """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() query = sa.select([self._bsq.bs_bldgid_column.distinct()]) query = self._bsq._add_join(query, [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)]) query = self._bsq._add_restrict(query, [("eiaid", eiaids)]) query = query.where(self._bsq._get_column("weight") > 0) if get_query_only: return self._bsq._compile(query) res = self._bsq.execute(query) return res
def get_eiaid_map(self) ‑> tuple[str, str, str]
-
Expand source code
def get_eiaid_map(self) -> tuple[str, str, str]: if self.eia_mapping_version == 1: map_table_name = 'eiaid_weights' map_baseline_column = f'{self._bsq._char_prefix}county' map_eiaid_column = 'county' elif self.eia_mapping_version == 2: map_table_name = 'v2_eiaid_weights' map_baseline_column = f'{self._bsq._char_prefix}county' map_eiaid_column = 'county' elif self.eia_mapping_version == 3: map_table_name = 'v3_eiaid_weights_%d' % (self.eia_mapping_year) map_baseline_column = f'{self._bsq._char_prefix}county' map_eiaid_column = 'county' else: raise ValueError("Invalid mapping_version") return map_table_name, map_baseline_column, map_eiaid_column
def get_eiaids(self, restrict: Optional[List[Tuple[str, List]]] = None) ‑> list[str]
-
Returns the list of eiaids
Args
restrict
- The list of where condition to restrict the results to. It should be specified as a list of tuple.
Example:
[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]
mapping_version
- Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used
Returns
Pandas dataframe consisting of the eiaids belonging to the provided list of locations.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_eiaids(self, restrict: Optional[List[Tuple[str, List]]] = None) -> list[str]: """ Returns the list of eiaids Args: restrict: The list of where condition to restrict the results to. It should be specified as a list of tuple. Example: `[('state',['VA','AZ']), ("build_existing_model.lighting",['60% CFL']), ...]` mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used Returns: Pandas dataframe consisting of the eiaids belonging to the provided list of locations. """ restrict = list(restrict) if restrict else [] eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() eiaid_col = self._bsq._get_column("eiaid", eiaid_map_table_name) if 'eiaids' in self._cache: if self._bsq.db_name + '/' + eiaid_map_table_name in self._cache['eiaids']: return self._cache['eiaids'][self._bsq.db_name + '/' + eiaid_map_table_name] else: self._cache['eiaids'] = {} join_list = [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)] annual_agg = self._agg.aggregate_annual(enduses=[], group_by=[eiaid_col], restrict=restrict, join_list=join_list, weights=['weight'], sort=True) self._cache['eiaids'] = list(annual_agg['eiaid'].to_numpy(dtype=str).tolist()) return self._cache['eiaids']
def get_filtered_results_csv_by_eiaid(self, eiaids: List[str], get_query_only: bool = False)
-
Returns a portion of the results csvs, which belongs to given list of utilities
Args
eiaids
- The eiaid list of utitlies
mapping_version
- Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used
get_query_only
- If set to true, returns the list of queries to run instead of the result.
Returns
Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_filtered_results_csv_by_eiaid( self, eiaids: List[str], get_query_only: bool = False): """ Returns a portion of the results csvs, which belongs to given list of utilities Args: eiaids: The eiaid list of utitlies mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the list of queries to run instead of the result. Returns: Pandas dataframe that is a subset of the results csv, that belongs to provided list of utilities """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() query = sa.select(['*']).select_from(self._bsq.bs_table) query = self._bsq._add_join(query, [(eiaid_map_table_name, map_baseline_column, map_eiaid_column)]) query = self._bsq._add_restrict(query, [("eiaid", eiaids)]) query = query.where(self._bsq._get_column("weight") > 0) if get_query_only: return self._bsq._compile(query) res = self._bsq.execute(query) return res
def get_locations_by_eiaids(self, eiaids: List[str], get_query_only: bool = False)
-
Returns the list of locations/counties (depends on mapping version) belonging to a given list of utilities.
Args
eiaids
- list of utility EIAIDs
mapping_version
- Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used
get_query_only
- If set to true, returns the query string instead of the result
Returns
Pandas dataframe consisting of the locations (for version 1) or counties (for version 2) belonging to the provided list of utilities.
Expand source code
@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True)) def get_locations_by_eiaids(self, eiaids: List[str], get_query_only: bool = False): """ Returns the list of locations/counties (depends on mapping version) belonging to a given list of utilities. Args: eiaids: list of utility EIAIDs mapping_version: Version of eiaid mapping to use. After the spatial refactor upgrade, version two should be used get_query_only: If set to true, returns the query string instead of the result Returns: Pandas dataframe consisting of the locations (for version 1) or counties (for version 2) belonging to the provided list of utilities. """ eiaid_map_table_name, map_baseline_column, map_eiaid_column = self.get_eiaid_map() eiaid_map_table = self._bsq._get_table(eiaid_map_table_name) query = sa.select([eiaid_map_table.c[map_eiaid_column].distinct()]) query = self._bsq._add_restrict(query, [("eiaid", eiaids)]) query = query.where(eiaid_map_table.c["weight"] > 0) if get_query_only: return self._bsq._compile(query) res = self._bsq.execute(query) return list(res[map_eiaid_column].values)
def get_rate_map(self, weekend_csv_path: str, weekday_csv_path: str) ‑> dict[tuple[int, int, int], float]
-
Expand source code
def get_rate_map(self, weekend_csv_path: str, weekday_csv_path: str) -> dict[tuple[int, int, int], float]: def read_rate_file(file_path: str) -> pd.DataFrame: df = read_csv(file_path) if len(df) != 12: raise ValueError(f"Invalid number of rows in {file_path}. Expected 12, got {len(df)}") if len(df.columns) != 25: raise ValueError(f"Invalid number of columns in {file_path}. Expected 25, got {len(df.columns)}") if 'month' != df.columns[0]: raise ValueError(f"Invalid column names in {file_path}. Expected first column to be 'month'") df = df.set_index('month') df.index = pd.Index(range(1, 13), name='month') df.columns = pd.Index(range(0, 24), name="Hour") return df weekday_rate = read_rate_file(weekday_csv_path) weekend_rate = read_rate_file(weekend_csv_path) weekday_rate['weekend'] = 0 weekend_rate['weekend'] = 1 full_rate = pd.concat([weekday_rate, weekend_rate]) full_rate = full_rate.reset_index().melt(id_vars=['month', 'weekend'], value_vars=range(0, 24), var_name='hour', value_name='rate') rate_map = full_rate.set_index(['month', 'weekend', 'hour'])['rate'].to_dict() return rate_map
class TOURate (rate_dict: dict[typing.Tuple[int, int, int], float])
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class TOURate(BaseModel): data: dict[TimeTuple, float] = Field(..., example={TimeTuple(month=1, is_weekend=0, hour=3): 0.5}) raw_dict: dict[Tuple[int, int, int], float] = Field(..., example={(1, 0, 3): 0.5}) def __init__(self, rate_dict: dict[Tuple[int, int, int], float]): data: dict[TimeTuple, float] = {} for key, value in rate_dict.items(): try: data[TimeTuple(month=key[0], is_weekend=key[1], hour=key[2])] = value except ValidationError as e: raise ValueError(f"Invalid key {key} in rate_dict. Make sure the keys are" " (month (1 to 12), is_weekend (0 or 1), hour_of_day (0 to 23)") from e super().__init__(data=data, raw_dict=rate_dict)
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var data : dict[TimeTuple, float]
var raw_dict : dict[typing.Tuple[int, int, int], float]
class TimeTuple (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class TimeTuple(BaseModel): # has to be between 1 and 12 month: int = Field(..., ge=1, le=12) is_weekend: int = Field(..., ge=0, le=1) hour: int = Field(..., ge=0, le=23) def __hash__(self): return hash((self.month, self.is_weekend, self.hour))
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var hour : int
var is_weekend : int
var month : int