Source code for reV.supply_curve.aggregation

# -*- coding: utf-8 -*-
"""
reV aggregation framework.
"""
from abc import ABC, abstractmethod
import h5py
import logging
import numpy as np
import os
import pandas as pd

from reV.handlers.outputs import Outputs
from reV.handlers.exclusions import ExclusionLayers
from reV.supply_curve.exclusions import ExclusionMaskFromDict
from reV.supply_curve.extent import SupplyCurveExtent
from reV.supply_curve.tech_mapping import TechMapping
from reV.supply_curve.points import AggregationSupplyCurvePoint
from reV.utilities.exceptions import (EmptySupplyCurvePointError,
                                      FileInputError, SupplyCurveInputError)
from reV.utilities import log_versions

from rex.resource import Resource
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.loggers import log_mem

logger = logging.getLogger(__name__)


[docs]class AbstractAggFileHandler(ABC): """Simple framework to handle aggregation file context managers.""" def __init__(self, excl_fpath, excl_dict=None, area_filter_kernel='queen', min_area=None): """ Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default 'queen' min_area : float, optional Minimum required contiguous area filter in sq-km, by default None """ self._excl_fpath = excl_fpath self._excl = ExclusionMaskFromDict(excl_fpath, layers_dict=excl_dict, min_area=min_area, kernel=area_filter_kernel) def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() if type is not None: raise
[docs] @abstractmethod def close(self): """Close all file handlers.""" self._excl.close()
@property def exclusions(self): """Get the exclusions file handler object. Returns ------- _excl : ExclusionMask Exclusions h5 handler object. """ return self._excl @property def h5(self): """ Placeholder for h5 Resource handler """
[docs]class AggFileHandler(AbstractAggFileHandler): """ Framework to handle aggregation file context manager: - exclusions .h5 file - h5 file to be aggregated """ DEFAULT_H5_HANDLER = Resource def __init__(self, excl_fpath, h5_fpath, excl_dict=None, area_filter_kernel='queen', min_area=None, h5_handler=None): """ Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). h5_fpath : str Filepath to .h5 file to be aggregated excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default 'queen' min_area : float, optional Minimum required contiguous area filter in sq-km, by default None h5_handler : rex.Resource | None Optional special handler similar to the rex.Resource handler which is default. """ super().__init__(excl_fpath, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area) if h5_handler is None: self._h5 = Resource(h5_fpath) else: self._h5 = h5_handler(h5_fpath) @property def h5(self): """ Get the h5 file handler object. Returns ------- _h5 : Outputs reV h5 outputs handler object. """ return self._h5
[docs] def close(self): """Close all file handlers.""" self._excl.close() self._h5.close()
[docs]class BaseAggregation(ABC): """Abstract supply curve points aggregation framework based on only an exclusion file and techmap.""" def __init__(self, excl_fpath, tm_dset, excl_dict=None, area_filter_kernel='queen', min_area=None, resolution=64, excl_area=None, res_fpath=None, gids=None, pre_extract_inclusions=False): """ Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). tm_dset : str Dataset name in the techmap file containing the exclusions-to-resource mapping data. excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default "queen" min_area : float, optional Minimum required contiguous area filter in sq-km, by default None resolution : int, optional SC resolution, must be input in combination with gid. Prefered option is to use the row/col slices to define the SC point instead, by default None excl_area : float, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None gids : list, optional List of supply curve point gids to get summary for (can use to subset if running in parallel), or None for all gids in the SC extent, by default None pre_extract_inclusions : bool, optional Optional flag to pre-extract/compute the inclusion mask from the provided excl_dict, by default False. Typically faster to compute the inclusion mask on the fly with parallel workers. """ self._excl_fpath = excl_fpath self._tm_dset = tm_dset self._excl_dict = excl_dict self._resolution = resolution self._area_filter_kernel = area_filter_kernel self._min_area = min_area self._res_fpath = res_fpath self._gids = gids self._pre_extract_inclusions = pre_extract_inclusions self._excl_area = self._get_excl_area(excl_fpath, excl_area=excl_area) self._shape = None self._validate_tech_mapping() if pre_extract_inclusions: self._inclusion_mask = \ ExclusionMaskFromDict.extract_inclusion_mask( excl_fpath, tm_dset, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area) else: self._inclusion_mask = None def _validate_tech_mapping(self): """Check that tech mapping exists and create it if it doesn't""" with ExclusionLayers(self._excl_fpath) as f: dsets = f.h5.dsets excl_fp_is_str = isinstance(self._excl_fpath, str) tm_in_excl = self._tm_dset in dsets if tm_in_excl: logger.info('Found techmap "{}".'.format(self._tm_dset)) elif not tm_in_excl and not excl_fp_is_str: msg = ('Could not find techmap dataset "{}" and cannot run ' 'techmap with arbitrary multiple exclusion filepaths ' 'to write to: {}'.format(self._tm_dset, self._excl_fpath)) logger.error(msg) raise RuntimeError(msg) else: logger.info('Could not find techmap "{}". Running techmap module.' .format(self._tm_dset)) try: TechMapping.run(self._excl_fpath, self._res_fpath, dset=self._tm_dset) except Exception as e: msg = ('TechMapping process failed. Received the ' 'following error:\n{}'.format(e)) logger.exception(msg) raise RuntimeError(msg) from e @property def gids(self): """ 1D array of supply curve point gids to aggregate Returns ------- ndarray """ if self._gids is None: with SupplyCurveExtent(self._excl_fpath, resolution=self._resolution) as sc: self._gids = sc.valid_sc_points(self._tm_dset) elif np.issubdtype(type(self._gids), np.number): self._gids = np.array([self._gids]) elif not isinstance(self._gids, np.ndarray): self._gids = np.array(self._gids) return self._gids @property def shape(self): """Get the shape of the full exclusions raster. Returns ------- tuple """ if self._shape is None: with SupplyCurveExtent(self._excl_fpath, resolution=self._resolution) as sc: self._shape = sc.exclusions.shape return self._shape @staticmethod def _get_excl_area(excl_fpath, excl_area=None): """ Get exclusion area from excl_fpath pixel area. Confirm that the exclusion area is not None. Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). excl_area : float, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None Returns ------- excl_area : float Area of an exclusion pixel in km2 """ if excl_area is None: logger.debug('Setting the exclusion area from the area of a pixel ' 'in {}'.format(excl_fpath)) with ExclusionLayers(excl_fpath) as excl: excl_area = excl.pixel_area if excl_area is None: e = ('No exclusion pixel area was input and could not parse ' 'area from the exclusion file attributes!') logger.error(e) raise SupplyCurveInputError(e) return excl_area @staticmethod def _check_inclusion_mask(inclusion_mask, gids, excl_shape): """ Check inclusion mask to ensure it has the proper shape Parameters ---------- inclusion_mask : np.ndarray | dict | optional 2D array pre-extracted inclusion mask where 1 is included and 0 is excluded. This must be either match the full exclusion shape or be a dict lookup of single-sc-point exclusion masks corresponding to the gids input and keyed by gids, by default None which will calculate exclusions on the fly for each sc point. gids : list | ndarray sc point gids corresponding to inclusion mask excl_shape : tuple Full exclusion layers shape """ if isinstance(inclusion_mask, dict): assert len(inclusion_mask) == len(gids) elif isinstance(inclusion_mask, np.ndarray): assert inclusion_mask.shape == excl_shape elif inclusion_mask is not None: msg = ('Expected inclusion_mask to be dict or array but received ' '{}'.format(type(inclusion_mask))) logger.error(msg) raise SupplyCurveInputError(msg) @staticmethod def _get_gid_inclusion_mask(inclusion_mask, gid, slice_lookup, resolution=64): """ Get inclusion mask for desired gid Parameters ---------- inclusion_mask : np.ndarray | dict | optional 2D array pre-extracted inclusion mask where 1 is included and 0 is excluded. This must be either match the full exclusion shape or be a dict lookup of single-sc-point exclusion masks corresponding to the gids input and keyed by gids, by default None which will calculate exclusions on the fly for each sc point. gid : int sc_point_gid value, used to extract inclusion mask from 2D inclusion array slice_lookup : dict Mapping of sc_point_gids to exclusion/inclusion row and column slices resolution : int, optional supply curve extent resolution, by default 64 Returns ------- gid_inclusions : ndarray | None 2D array of inclusions for desired gid, normalized from 0, excluded to 1 fully included, if inclusion mask is None gid_inclusions is None """ gid_inclusions = None if isinstance(inclusion_mask, dict): gid_inclusions = inclusion_mask[gid] assert gid_inclusions.shape[0] <= resolution assert gid_inclusions.shape[1] <= resolution elif isinstance(inclusion_mask, np.ndarray): row_slice, col_slice = slice_lookup[gid] gid_inclusions = inclusion_mask[row_slice, col_slice] elif inclusion_mask is not None: msg = ('Expected inclusion_mask to be dict or array but received ' '{}'.format(type(inclusion_mask))) logger.error(msg) raise SupplyCurveInputError(msg) return gid_inclusions @staticmethod def _parse_gen_index(gen_fpath): """Parse gen outputs for an array of generation gids corresponding to the resource gids. Parameters ---------- gen_fpath : str Filepath to reV generation output .h5 file. This can also be a csv filepath to a project points input file. Returns ------- gen_index : np.ndarray Array of generation gids with array index equal to resource gid. Array value is -1 if the resource index was not used in the generation run. """ if gen_fpath.endswith('.h5'): with Resource(gen_fpath) as f: gen_index = f.meta elif gen_fpath.endswith('.csv'): gen_index = pd.read_csv(gen_fpath) else: msg = ('Could not recognize gen_fpath input, needs to be reV gen ' 'output h5 or project points csv but received: {}' .format(gen_fpath)) logger.error(msg) raise FileInputError(msg) if 'gid' in gen_index: gen_index = gen_index.rename(columns={'gid': 'res_gids'}) gen_index['gen_gids'] = gen_index.index gen_index = gen_index[['res_gids', 'gen_gids']] gen_index = gen_index.set_index(keys='res_gids') gen_index = \ gen_index.reindex(range(int(gen_index.index.max() + 1))) gen_index = gen_index['gen_gids'].values gen_index[np.isnan(gen_index)] = -1 gen_index = gen_index.astype(np.int32) else: gen_index = None return gen_index
[docs]class Aggregation(BaseAggregation): """Concrete but generalized aggregation framework to aggregate ANY reV h5 file to a supply curve grid (based on an aggregated exclusion grid).""" def __init__(self, excl_fpath, tm_dset, *agg_dset, excl_dict=None, area_filter_kernel='queen', min_area=None, resolution=64, excl_area=None, gids=None, pre_extract_inclusions=False): """ Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). tm_dset : str Dataset name in the techmap file containing the exclusions-to-resource mapping data. agg_dset : str Dataset to aggreate, can supply multiple datasets. The datasets should be scalar values for each site. This method cannot aggregate timeseries data. excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default "queen" min_area : float, optional Minimum required contiguous area filter in sq-km, by default None resolution : int, optional SC resolution, must be input in combination with gid. Prefered option is to use the row/col slices to define the SC point instead, by default None excl_area : float, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None gids : list, optional List of supply curve point gids to get summary for (can use to subset if running in parallel), or None for all gids in the SC extent, by default None pre_extract_inclusions : bool, optional Optional flag to pre-extract/compute the inclusion mask from the provided excl_dict, by default False. Typically faster to compute the inclusion mask on the fly with parallel workers. """ log_versions(logger) logger.info('Initializing Aggregation...') logger.debug('Exclusion filepath: {}'.format(excl_fpath)) logger.debug('Exclusion dict: {}'.format(excl_dict)) super().__init__(excl_fpath, tm_dset, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area, resolution=resolution, excl_area=excl_area, gids=gids, pre_extract_inclusions=pre_extract_inclusions) if isinstance(agg_dset, str): agg_dset = (agg_dset, ) self._agg_dsets = agg_dset def _check_files(self, h5_fpath): """Do a preflight check on input files""" if not os.path.exists(self._excl_fpath): raise FileNotFoundError('Could not find required exclusions file: ' '{}'.format(self._excl_fpath)) if not os.path.exists(h5_fpath): raise FileNotFoundError('Could not find required h5 file: ' '{}'.format(h5_fpath)) with h5py.File(self._excl_fpath, 'r') as f: if self._tm_dset not in f: raise FileInputError('Could not find techmap dataset "{}" ' 'in exclusions file: {}' .format(self._tm_dset, self._excl_fpath)) with Resource(h5_fpath) as f: for dset in self._agg_dsets: if dset not in f: raise FileInputError('Could not find provided dataset "{}"' ' in h5 file: {}' .format(dset, h5_fpath))
[docs] @classmethod def run_serial(cls, excl_fpath, h5_fpath, tm_dset, *agg_dset, agg_method='mean', excl_dict=None, inclusion_mask=None, area_filter_kernel='queen', min_area=None, resolution=64, excl_area=0.0081, gids=None, gen_index=None): """ Standalone method to aggregate - can be parallelized. Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). h5_fpath : str Filepath to .h5 file to aggregate tm_dset : str Dataset name in the techmap file containing the exclusions-to-resource mapping data. agg_dset : str Dataset to aggreate, can supply multiple datasets. The datasets should be scalar values for each site. This method cannot aggregate timeseries data. agg_method : str, optional Aggregation method, either mean or sum/aggregate, by default "mean" excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. by default None inclusion_mask : np.ndarray, optional 2D array pre-extracted inclusion mask where 1 is included and 0 is excluded. This must be either match the full exclusion shape or be a list of single-sc-point exclusion masks corresponding to the gids input, by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default "queen" min_area : float, optional Minimum required contiguous area filter in sq-km, by default None resolution : int, optional SC resolution, must be input in combination with gid. Prefered option is to use the row/col slices to define the SC point instead, by default 0.0081 excl_area : float, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None gids : list, optional List of supply curve point gids to get summary for (can use to subset if running in parallel), or None for all gids in the SC extent, by default None gen_index : np.ndarray, optional Array of generation gids with array index equal to resource gid. Array value is -1 if the resource index was not used in the generation run, by default None Returns ------- agg_out : dict Aggregated values for each aggregation dataset """ with SupplyCurveExtent(excl_fpath, resolution=resolution) as sc: exclusion_shape = sc.exclusions.shape if gids is None: gids = sc.valid_sc_points(tm_dset) elif np.issubdtype(type(gids), np.number): gids = [gids] slice_lookup = sc.get_slice_lookup(gids) cls._check_inclusion_mask(inclusion_mask, gids, exclusion_shape) # pre-extract handlers so they are not repeatedly initialized file_kwargs = {'excl_dict': excl_dict, 'area_filter_kernel': area_filter_kernel, 'min_area': min_area} dsets = agg_dset + ('meta', ) agg_out = {ds: [] for ds in dsets} with AggFileHandler(excl_fpath, h5_fpath, **file_kwargs) as fh: n_finished = 0 for gid in gids: gid_inclusions = cls._get_gid_inclusion_mask( inclusion_mask, gid, slice_lookup, resolution=resolution) try: gid_out = AggregationSupplyCurvePoint.run( gid, fh.exclusions, fh.h5, tm_dset, *agg_dset, agg_method=agg_method, excl_dict=excl_dict, inclusion_mask=gid_inclusions, resolution=resolution, excl_area=excl_area, exclusion_shape=exclusion_shape, close=False, gen_index=gen_index) except EmptySupplyCurvePointError: logger.debug('SC gid {} is fully excluded or does not ' 'have any valid source data!'.format(gid)) except Exception as e: msg = 'SC gid {} failed!'.format(gid) logger.exception(msg) raise RuntimeError(msg) from e else: n_finished += 1 logger.debug('Serial aggregation: ' '{} out of {} points complete' .format(n_finished, len(gids))) log_mem(logger) for k, v in gid_out.items(): agg_out[k].append(v) return agg_out
[docs] def run_parallel(self, h5_fpath, agg_method='mean', excl_area=None, max_workers=None, sites_per_worker=100): """ Aggregate in parallel Parameters ---------- h5_fpath : str Filepath to .h5 file to aggregate agg_method : str, optional Aggregation method, either mean or sum/aggregate, by default "mean" excl_area : float, optional Area of an exclusion cell (square km), by default None max_workers : int, optional Number of cores to run summary on. None is all available cpus, by default None sites_per_worker : int, optional Number of SC points to process on a single parallel worker, by default 100 Returns ------- agg_out : dict Aggregated values for each aggregation dataset """ self._check_files(h5_fpath) gen_index = self._parse_gen_index(h5_fpath) slice_lookup = None chunks = int(np.ceil(len(self.gids) / sites_per_worker)) chunks = np.array_split(self.gids, chunks) if self._inclusion_mask is not None: with SupplyCurveExtent(self._excl_fpath, resolution=self._resolution) as sc: assert sc.exclusions.shape == self._inclusion_mask.shape slice_lookup = sc.get_slice_lookup(self.gids) logger.info('Running supply curve point aggregation for ' 'points {} through {} at a resolution of {} ' 'on {} cores in {} chunks.' .format(self.gids[0], self.gids[-1], self._resolution, max_workers, len(chunks))) n_finished = 0 futures = [] dsets = self._agg_dsets + ('meta', ) agg_out = {ds: [] for ds in dsets} loggers = [__name__, 'reV.supply_curve.points', 'reV'] with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: # iterate through split executions, submitting each to worker for gid_set in chunks: # submit executions and append to futures list chunk_incl_masks = None if self._inclusion_mask is not None: chunk_incl_masks = {} for gid in gid_set: rs, cs = slice_lookup[gid] chunk_incl_masks[gid] = self._inclusion_mask[rs, cs] # submit executions and append to futures list futures.append(exe.submit( self.run_serial, self._excl_fpath, h5_fpath, self._tm_dset, *self._agg_dsets, agg_method=agg_method, excl_dict=self._excl_dict, inclusion_mask=chunk_incl_masks, area_filter_kernel=self._area_filter_kernel, min_area=self._min_area, resolution=self._resolution, excl_area=excl_area, gids=gid_set, gen_index=gen_index)) # gather results for future in futures: n_finished += 1 logger.info('Parallel aggregation futures collected: ' '{} out of {}' .format(n_finished, len(chunks))) for k, v in future.result().items(): if v: agg_out[k].extend(v) return agg_out
[docs] def aggregate(self, h5_fpath, agg_method='mean', max_workers=None, sites_per_worker=100): """ Aggregate with given agg_method Parameters ---------- h5_fpath : str Filepath to .h5 file to aggregate agg_method : str, optional Aggregation method, either mean or sum/aggregate, by default "mean" max_workers : int, optional Number of cores to run summary on. None is all available cpus, by default None sites_per_worker : int, optional Number of SC points to process on a single parallel worker, by default 100 Returns ------- agg : dict Aggregated values for each aggregation dataset """ if max_workers is None: max_workers = os.cpu_count() if max_workers == 1: self._check_files(h5_fpath) gen_index = self._parse_gen_index(h5_fpath) agg = self.run_serial(self._excl_fpath, h5_fpath, self._tm_dset, *self._agg_dsets, agg_method=agg_method, excl_dict=self._excl_dict, gids=self.gids, inclusion_mask=self._inclusion_mask, area_filter_kernel=self._area_filter_kernel, min_area=self._min_area, resolution=self._resolution, excl_area=self._excl_area, gen_index=gen_index) else: agg = self.run_parallel(h5_fpath=h5_fpath, agg_method=agg_method, excl_area=self._excl_area, max_workers=max_workers, sites_per_worker=sites_per_worker) if not agg['meta']: e = ('Supply curve aggregation found no non-excluded SC points. ' 'Please check your exclusions or subset SC GID selection.') logger.error(e) raise EmptySupplyCurvePointError(e) for k, v in agg.items(): if k == 'meta': v = pd.concat(v, axis=1).T v = v.sort_values('sc_point_gid') v = v.reset_index(drop=True) v.index.name = 'sc_gid' agg[k] = v else: v = np.dstack(v)[0] if v.shape[0] == 1: v = v.flatten() agg[k] = v return agg
[docs] @staticmethod def save_agg_to_h5(h5_fpath, out_fpath, aggregation): """ Save aggregated data to disc in .h5 format Parameters ---------- out_fpath : str Output .h5 file path aggregation : dict Aggregated values for each aggregation dataset """ agg_out = aggregation.copy() meta = agg_out.pop('meta').reset_index() for c in meta.columns: try: meta[c] = pd.to_numeric(meta[c]) except (ValueError, TypeError): pass dsets = [] shapes = {} attrs = {} chunks = {} dtypes = {} time_index = None with Resource(h5_fpath) as f: for dset, data in agg_out.items(): dsets.append(dset) shape = data.shape shapes[dset] = shape if len(data.shape) == 2: if ('time_index' in f) and (shape[0] == f.shape[0]): if time_index is None: time_index = f.time_index attrs[dset] = f.get_attrs(dset=dset) _, dtype, chunk = f.get_dset_properties(dset) chunks[dset] = chunk dtypes[dset] = dtype Outputs.init_h5(out_fpath, dsets, shapes, attrs, chunks, dtypes, meta, time_index=time_index) with Outputs(out_fpath, mode='a') as out: for dset, data in agg_out.items(): out[dset] = data
[docs] @classmethod def run(cls, excl_fpath, h5_fpath, tm_dset, *agg_dset, excl_dict=None, area_filter_kernel='queen', min_area=None, resolution=64, excl_area=None, gids=None, pre_extract_inclusions=False, agg_method='mean', max_workers=None, sites_per_worker=100, out_fpath=None): """Get the supply curve points aggregation summary. Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). h5_fpath : str Filepath to .h5 file to aggregate tm_dset : str Dataset name in the techmap file containing the exclusions-to-resource mapping data. agg_dset : str Dataset to aggreate, can supply multiple datasets. The datasets should be scalar values for each site. This method cannot aggregate timeseries data. excl_dict : dict | None Dictionary of exclusion keyword arugments of the format {layer_dset_name: {kwarg: value}} where layer_dset_name is a dataset in the exclusion h5 file and kwarg is a keyword argument to the reV.supply_curve.exclusions.LayerMask class. by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default "queen" min_area : float, optional Minimum required contiguous area filter in sq-km, by default None resolution : int, optional SC resolution, must be input in combination with gid. Prefered option is to use the row/col slices to define the SC point instead, by default None excl_area : float, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None gids : list, optional List of supply curve point gids to get summary for (can use to subset if running in parallel), or None for all gids in the SC extent, by default None pre_extract_inclusions : bool, optional Optional flag to pre-extract/compute the inclusion mask from the provided excl_dict, by default False. Typically faster to compute the inclusion mask on the fly with parallel workers. agg_method : str, optional Aggregation method, either mean or sum/aggregate, by default "mean" max_workers : int, optional Number of cores to run summary on. None is all available cpus, by default None sites_per_worker : int, optional Number of SC points to process on a single parallel worker, by default 100 out_fpath : str, optional Output .h5 file path, by default None Returns ------- agg : dict Aggregated values for each aggregation dataset """ agg = cls(excl_fpath, tm_dset, *agg_dset, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area, resolution=resolution, excl_area=excl_area, gids=gids, pre_extract_inclusions=pre_extract_inclusions) aggregation = agg.aggregate(h5_fpath=h5_fpath, agg_method=agg_method, max_workers=max_workers, sites_per_worker=sites_per_worker) if out_fpath is not None: agg.save_agg_to_h5(h5_fpath, out_fpath, aggregation) return aggregation