Source code for reVX.hybrid_stats.temporal_agg

# -*- coding: utf-8 -*-
"""
Module to temporaly aggregate solar and wind CF profiles
"""
import logging
import numpy as np
import os
import pandas as pd

from reVX.handlers.outputs import Outputs
from rex.resource import Resource
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.utilities import slice_sites, roll_timeseries

logger = logging.getLogger(__name__)


[docs]def get_time_index(h5_path, year=None): """ Extract time_index from .h5 file. If 'year' is provided extact time_index for given year. Parameters ---------- h5_path : str Path to .h5 file to extract meta and time_index year : str | int, optional Year to extract time-index for if running on a multi-year file, by default None Returns ------- time_index: pandas.DatatimeIndex Datetime Index """ with Resource(h5_path) as f: if 'time_index' in f: time_index = f.time_index elif year is not None: time_index = f[f'time_index-{year}'] else: ti_dsets = [dset for dset in f.datasets if dset.startswith('time_index')] msg = ("'time_index' is not available in {}. The following " "potential annual time_index are available: {}. Please " "specify a 'year' to use the 'time_index' for a " "specific year.".format(h5_path, ti_dsets)) logger.error(ti_dsets) raise ValueError(msg) return time_index
[docs]class DatasetAgg(): """ Temporaly Aggregate Dataset """ def __init__(self, h5_fpath, dset, time_index=None, year=None, local_time=False): """ Parameters ---------- h5_fpath : str Path to source h5 filepath dset : str Dataset to aggregate time_index : pandas.DatetimeIndex, optional Dataset datetime index, if None, extract from h5_fpath, by default None year : str | int, optional Year to extract time-index for if running on a multi-year file, by default None local_time : bool Flag to shift data to local time before aggregating temporal data. Default is to stay in UTC. """ self._h5_fpath = h5_fpath self._dset = dset if time_index is None: time_index = get_time_index(h5_fpath, year=year) self._time_index = time_index self._year = year self._local_time = local_time with Resource(h5_fpath) as f: msg = '{} is not present in {}'.format(dset, h5_fpath) assert dset in f, msg def __repr__(self): msg = ('{} of {} in {}' .format(self.__class__.__name__, self._dset, self._h5_fpath)) return msg @staticmethod def _aggregate_sites(h5_fpath, dset, site_slice=None, freq='1d', method='mean', time_index=None, year=None, local_time=False, **resample_kwargs): """ Aggregate given sites in dataset Parameters ---------- h5_fpath : str Path to source h5 filepath dset : str Dataset to aggregate site_slice : slice, optional Sites to aggregate, if None aggregate all sites, by default None freq : str, optional Aggregation frequency, by default '1d' method : str, optional Aggregation method, either 'mean' or 'sum', by default 'mean' time_index : pandas.DatetimeIndex, optional Dataset datetime index, if None, extract from h5_fpath, by default None year : str | int, optional Year to extract time-index for if running on a multi-year file, by default None local_time : bool Flag to shift data to local time before aggregating temporal data. Default is to stay in UTC. resample_kwargs : dict, optional Kwargs for pandas.DataFrame.resample Returns ------- """ if site_slice is None: site_slice = slice(None) if time_index is None: time_index = get_time_index(h5_fpath, year=year) with Resource(h5_fpath) as f: arr = f[dset, :, site_slice] if local_time: timezones = f.get_meta_arr('timezone', rows=site_slice) arr = roll_timeseries(arr, timezones) agg_data = pd.DataFrame(arr, index=time_index) agg_data = agg_data.resample(freq, **resample_kwargs) if method.lower() == 'mean': agg_data = agg_data.mean() elif method.lower() == 'sum': agg_data = agg_data.sum() else: msg = ('Invalid aggregation method {}, must be "mean" or ' '"sum"!'.format(method)) logger.error(msg) raise ValueError(msg) return agg_data.values def _get_slices(self, dset, chunks_per_slice=5): """ Get slices to extract Parameters ---------- dset : str Dataset to aggregate chunks_per_slice : int, optional Number of chunks to extract in each slice, by default 5 Returns ------- slices : list List of slices to extract """ with Resource(self._h5_fpath) as f: shape, _, chunks = f.get_dset_properties(dset) if len(shape) != 2: msg = ('Cannot aggregate dataset {}, as it is ' 'not a timeseries dataset!'.format(dset)) logger.error(msg) raise RuntimeError(msg) slices = slice_sites(shape, chunks, chunks_per_slice=chunks_per_slice) return slices, shape def _create_agg_arr(self, shape, freq='1d', **resample_kwargs): """ Create empty aggregation array Parameters ---------- shape : tuple Dataset shape freq : str, optional Aggregation frequency, by default '1d' resample_kwargs : dict, optional Kwargs for pandas.DataFrame.resample Returns ------- out : ndarray Output array for aggregated dataset """ out = pd.Series(0, index=self._time_index) out = out.resample(freq, **resample_kwargs).mean() shape = (out.shape[0], shape[1]) return np.zeros(shape, dtype=np.float32)
[docs] def aggregate(self, freq='1d', method='mean', max_workers=None, chunks_per_worker=5, **resample_kwargs): """ Aggregate dataset to desired frequency using desired method Parameters ---------- freq : str, optional Aggregation frequency, by default '1d' method : str, optional Aggregation method, either 'mean' or 'sum', by default 'mean' max_workers : None | int, optional Number of workers to use, if 1 run in serial, if None use all available cores, by default None chunks_per_worker : int, optional Number of chunks to extract on each worker, by default 5 resample_kwargs : dict, optional Kwargs for pandas.DataFrame.resample Returns ------- dset_agg : ndarray Aggregated dataset array """ if max_workers is None: max_workers = os.cpu_count() slices, shape = self._get_slices(self._dset, chunks_per_slice=chunks_per_worker) if len(slices) == 1: max_workers = 1 dset_agg = self._create_agg_arr(shape, freq=freq, **resample_kwargs) if max_workers > 1: msg = ('Aggregating {} in parallel using {} workers' .format(self._dset, max_workers)) logger.info(msg) loggers = [__name__, 'rex', 'reVX'] with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: futures = [] for sites_slice in slices: future = exe.submit(self._aggregate_sites, self._h5_fpath, self._dset, site_slice=sites_slice, freq=freq, method=method, time_index=self._time_index, year=self._year, local_time=self._local_time, **resample_kwargs) futures.append(future) for i, future in enumerate(futures): dset_agg[:, slices[i]] = future.result() logger.debug('Completed {} out of {} workers' .format((i + 1), len(futures))) else: msg = 'Aggregating {} in serial'.format(self._dset) logger.info(msg) for i, sites_slice in enumerate(slices): dset_agg[:, sites_slice] = self._aggregate_sites( self._h5_fpath, self._dset, site_slice=sites_slice, freq=freq, method=method, time_index=self._time_index, year=self._year, local_time=self._local_time, **resample_kwargs) logger.debug('Completed {} out of {} sets of sites' .format((i + 1), len(slices))) return dset_agg
[docs] @classmethod def run(cls, h5_fpath, dset, time_index=None, year=None, freq='1d', method='mean', max_workers=None, chunks_per_worker=5, local_time=False, **resample_kwargs): """ Temporally aggregate dataset to given frequency using given method Parameters ---------- h5_fpath : str Path to source h5 filepath dset : str Dataset to aggregate time_index : pandas.DatetimeIndex, optional Dataset datetime index, if None, extract from h5_fpath, by default None year : str | int, optional Year to extract time-index for if running on a multi-year file, by default None freq : str, optional Aggregation frequency, by default '1d' method : str, optional Aggregation method, either 'mean' or 'sum', by default 'mean' max_workers : None | int, optional Number of workers to use, if 1 run in serial, if None use all available cores, by default None chunks_per_worker : int, optional Number of chunks to extract on each worker, by default 5 local_time : bool Flag to shift data to local time before aggregating temporal data. Default is to stay in UTC. resample_kwargs : dict, optional Kwargs for pandas.DataFrame.resample Returns ------- agg_data : ndarray Dataset aggregated do given frequency using given method """ logger.info('Aggregating {} in {} to {} using {}' .format(dset, h5_fpath, freq, method)) agg = cls(h5_fpath, dset, time_index=time_index, year=year, local_time=local_time) agg_dset = agg.aggregate(freq=freq, method=method, max_workers=max_workers, chunks_per_worker=chunks_per_worker, **resample_kwargs) return agg_dset
[docs]class TemporalAgg(): """ Class to temporally aggregate time-series data """ def __init__(self, src_fpath, dst_fpath, freq='1d', dsets=None, year=None, local_time=False, **resample_kwargs): """ Parameters ---------- src_fpath : str Path to source h5 file dst_fpath : str Path to destination h5 file to save aggregated datasets to. freq : str, optional Aggregation frequency, by default '1d' dsets : list, optional Datasets to aggregate, if None aggregate all datasets in src_fpath, by default None year : str | int, optional Year to extract time-index and datasets for, needed if running on a multi-year file, by default None local_time : bool Flag to shift data to local time before aggregating temporal data. Default is to stay in UTC. resample_kwargs : dict, optional Kwargs for pandas.DataFrame.resample """ self._src_fpath = src_fpath self._dst_fpath = dst_fpath self._freq = freq self._local_time = local_time self._dsets, self._time_index = self._get_dsets(src_fpath, dsets=dsets, year=year) self._resample_kwargs = resample_kwargs self._init_agg_h5(year=year) def __repr__(self): msg = '{} of {}'.format(self.__class__.__name__, self._src_fpath) return msg @property def dsets(self): """ Datasets to aggregate Returns ------- list """ return self._dsets @staticmethod def _get_dsets(h5_fpath, dsets=None, year=None): """ Get datasets to aggregate, or if given ensure they are in source h5 file. Also extract time_index Parameters ---------- h5_fpath : str Path to .h5 file to source dataset for aggregation from dsets : list, optional Datasets to aggregate, if None aggregate all datasets in src_fpath, by default None Returns ------- dsets : list List of datasets to aggregate time_index : pandas.DatetimeIndex DatetimeIndex of datasets being aggregated """ with Resource(h5_fpath) as f: if dsets is None: res_dsets = f.resource_datasets if year is not None: res_dsets = [ds for ds in res_dsets if str(year) in ds and 'time_index' not in ds] dsets = [] for ds in res_dsets: if len(f.shapes[ds]) == 2: dsets.append(ds) else: for ds in dsets: msg = "{} is not available in {}".format(ds, h5_fpath) assert ds in f, msg time_index = get_time_index(h5_fpath, year=year) return dsets, time_index def _init_agg_h5(self, year=None): """ Initialize the dst .h5 file that will contain the aggregated datasets Parameter --------- year : str | int, optional Year to extract time-index and datasets for, needed if running on a multi-year file, by default None """ logger.info('Initializing {}'.format(self._dst_fpath)) time_index = pd.Series(0, index=self._time_index) time_index = time_index.resample(self._freq, **self._resample_kwargs) time_index = time_index.mean().index shape = (len(time_index), ) ti_dset = 'time_index' if year is not None: ti_dset += '-{}'.format(year) with Outputs(self._dst_fpath, mode='a') as f_out: with Resource(self._src_fpath) as f_in: meta = f_in.meta shape += (len(meta), ) f_out._shape = shape if 'meta' not in f_out: logger.debug('Copying meta data') f_out['meta'] = meta logger.debug('Copying {}'.format(ti_dset)) f_out._set_time_index(ti_dset, time_index) for ds in self.dsets: _, dtype, chunks = f_in.get_dset_properties(ds) attrs = f_in.attrs[ds] logger.debug('Initializing aggregated {} w/ properties:' '\nshape: {}' '\ndtype: {}' '\nchunks: {}' '\nattrs: {}' .format(ds, shape, dtype, chunks, attrs)) f_out._create_dset(ds, shape, dtype, chunks=chunks, attrs=attrs)
[docs] def aggregate(self, method='mean', max_workers=None, chunks_per_worker=5): """ Aggregate desired datasets and write to disk Parameters ---------- method : str, optional Aggregation method, either 'mean' or 'sum', by default 'mean' max_workers : None | int, optional Number of workers to use, if 1 run in serial, if None use all available cores, by default None chunks_per_worker : int, optional Number of chunks to extract on each worker, by default 5 """ for ds in self.dsets: ds_agg = DatasetAgg.run(self._src_fpath, ds, time_index=self._time_index, freq=self._freq, method=method, max_workers=max_workers, chunks_per_worker=chunks_per_worker, local_time=self._local_time, **self._resample_kwargs) with Outputs(self._dst_fpath, mode='a') as f: logger.info('Writing aggregated data for {} to disk' .format(ds)) f[ds] = ds_agg
[docs] @classmethod def run(cls, src_fpath, dst_fpath, freq='1d', dsets=None, year=None, method='mean', max_workers=None, chunks_per_worker=5, local_time=False, **resample_kwargs): """ Temporally aggregate the desired datasets in the src .h5 file to the given frequency using the given method. Save the aggregated datasets to the dst .h5 file. Parameters ---------- src_fpath : str Path to source h5 file dst_fpath : str Path to destination h5 file to save aggregated datasets to. freq : str, optional Aggregation frequency, by default '1d' dsets : list, optional Datasets to aggregate, if None aggregate all datasets in src_fpath, by default None year : str | int, optional Year to extract time-index and datasets for, needed if running on a multi-year file, by default None method : str, optional Aggregation method, either 'mean' or 'sum', by default 'mean' max_workers : None | int, optional Number of workers to use, if 1 run in serial, if None use all available cores, by default None chunks_per_worker : int, optional Number of chunks to extract on each worker, by default 5 local_time : bool Flag to shift data to local time before aggregating temporal data. Default is to stay in UTC. resample_kwargs : dict, optional Kwargs for pandas.DataFrame.resample """ logger.info('Aggregating datasets in {} to {} using {} and saving to ' '{}'.format(src_fpath, freq, method, dst_fpath)) agg = cls(src_fpath, dst_fpath, freq=freq, dsets=dsets, year=year, local_time=local_time, **resample_kwargs) agg.aggregate(method=method, max_workers=max_workers, chunks_per_worker=chunks_per_worker)