# -*- coding: utf-8 -*-
"""
Classes to handle resource data
"""
import os
from abc import ABC, abstractmethod
from warnings import warn
import dateutil
import h5py
import numpy as np
import pandas as pd
from rex.sam_resource import SAMResource
from rex.utilities.exceptions import ResourceKeyError, ResourceRuntimeError
from rex.utilities.parse_keys import parse_keys, parse_slice
from rex.utilities.utilities import check_tz, get_lat_lon_cols
[docs]
class BaseDatasetIterable(ABC):
"""Base class for file that is iterable over datasets. """
@property
@abstractmethod
def datasets(self):
"""iterable: Datasets available in file. """
def __iter__(self):
return iter(self.datasets)
[docs]
class ResourceDataset:
"""
h5py.Dataset wrapper for Resource .h5 files
"""
def __init__(self, ds, scale_attr='scale_factor', add_attr='add_offset',
unscale=True):
"""
Parameters
----------
ds : h5py.dataset
Open .h5 dataset instance to extract data from
scale_attr : str | list | optional
Name of scale factor attribute, by default 'scale_factor'. Can also
be a prioritized list of scale factor names.
add_attr : str | list | optional
Name of add offset attribute, by default 'add_offset'. Can also
be a prioritized list of add offset names.
unscale : bool, optional
Flag to unscale dataset data, by default True
"""
self._ds = ds
self._scale_factor = self._parse_scale_add_attrs(scale_attr, 1)
self._adder = self._parse_scale_add_attrs(add_attr, 0)
self._unscale = unscale
if self._scale_factor == 1 and self._adder == 0:
self._unscale = False
def __repr__(self):
msg = "{} for {}".format(self.__class__.__name__, self.ds.name)
return msg
def __getitem__(self, ds_slice):
ds_slice = parse_slice(ds_slice)
return self._get_ds_slice(ds_slice)
@property
def ds(self):
"""
Open Dataset instance
Returns
-------
h5py(d).Dataset
"""
return self._ds
@property
def shape(self):
"""
Dataset shape
Returns
-------
tuple
"""
return self.ds.shape
@property
def size(self):
"""
Dataset size
Returns
-------
int
"""
return self.ds.size
@property
def dtype(self):
"""
Dataset dtype
Returns
-------
str | numpy.dtype
"""
return self.ds.dtype
@property
def chunks(self):
"""
Dataset chunk size
Returns
-------
tuple
"""
chunks = self.ds.chunks
if isinstance(chunks, dict):
chunks = tuple(chunks.get('dims', None))
return chunks
@property
def scale_factor(self):
"""
Dataset scale factor
Returns
-------
float
"""
return self._scale_factor
@property
def adder(self):
"""
Dataset add offset
Returns
-------
float
"""
return self._adder
def _parse_scale_add_attrs(self, attr, default):
"""Get the scale and add offset factors using one or more prioritized
scale/add attribute names.
Parameters
----------
attr : str | list | optional
Name of scale factor or adder attribute. Can also
be a prioritized list of attr names.
default : float
Default factor if attr is not found
Returns
-------
factor : float
Multiplicative or adder scale factor retrieved from dataset
attributes.
"""
factor = default
if isinstance(attr, str):
attr = [attr]
for name in attr:
if name in self.ds.attrs:
factor = self.ds.attrs[name]
break
return factor
@staticmethod
def _check_slice(ds_slice):
"""
Check ds_slice for lists, ensure lists are of the same len
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
list_len : int | None
List lenght, None if none of the args are a list | ndarray
multi_list : bool
Flag if multiple list are provided in ds_slice
"""
multi_list = False
list_len = []
for s in ds_slice:
if isinstance(s, (list, np.ndarray)):
list_len.append(len(s))
if list_len:
if len(list_len) > 1:
multi_list = True
list_len = list(set(list_len))
if len(list_len) > 1:
msg = ('shape mismatch: indexing arrays could not be '
'broadcast together with shapes {}'
.format(['({},)'.format(ln) for ln in list_len]))
raise IndexError(msg)
list_len = list_len[0]
else:
list_len = None
return list_len, multi_list
@staticmethod
def _make_list_slices(ds_slice, list_len):
"""
Duplicate slice arguements to enable zipping of list slices with
non-list slices
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
list_len : int
List lenght
Returns
-------
zip_slices : list
List of slices to extract for each entry in list slice
"""
zip_slices = []
for s in ds_slice:
if not isinstance(s, (list, np.ndarray)):
zip_slices.append([s] * list_len)
else:
zip_slices.append(s)
return zip_slices
@staticmethod
def _list_to_slice(ds_slice):
"""
Check ds_slice to see if it is an int, slice, or list. Return
pieces required for fancy indexing based on input type.
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
ds_slice : slice
Slice that encompasses the entire range
ds_idx : ndarray
Adjusted list to extract points of interest from sliced array
"""
ds_idx = None
if isinstance(ds_slice, (list, np.ndarray)):
in_slice = np.array(ds_slice)
if np.issubdtype(in_slice.dtype, np.dtype(bool)):
in_slice = np.where(in_slice)[0]
s = in_slice.min()
e = in_slice.max() + 1
ds_slice = slice(s, e, None)
ds_idx = in_slice - s
elif isinstance(ds_slice, slice):
ds_idx = slice(None)
return ds_slice, ds_idx
@staticmethod
def _get_out_arr_slice(arr_slice, start):
"""
Determine slice of pre-build output array that is being filled
Parameters
----------
arr_slice : tuple
Tuple of (int, slice, list, ndarray) for section of output array
being extracted
start : int
Start of slice, used for list gets
Returns
-------
out_slice : tuple
Slice arguments of portion of output array to insert arr_slice
into
stop : int
Stop of slice, used for list gets, will be new start upon
iteration
"""
out_slice = ()
int_slice = ()
int_start = start
int_stop = start
stop = start
for s in arr_slice:
if isinstance(s, slice):
out_slice += (slice(None), )
int_slice += (slice(None), )
elif isinstance(s, (int, np.integer)):
if int_start == int_stop:
int_slice += (int_start, )
int_stop += 1
elif isinstance(s, (list, tuple, np.ndarray)):
list_len = len(s)
if list_len == 1:
stop += 1
out_slice += ([start], )
else:
stop += len(s)
out_slice += (slice(start, stop), )
if not out_slice:
out_slice += (start, )
stop += 1
elif all(s == slice(None) for s in out_slice):
out_slice = int_slice
stop = int_stop
return out_slice, stop
def _get_out_arr_shape(self, ds_slice):
"""
Determine shape of output array
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out_shape : tuple
Shape of output array
"""
ds_shape = self.shape
out_shape = ()
contains_list = False
ds_slice += (slice(None), ) * (len(ds_shape) - len(ds_slice))
for i, ax_slice in enumerate(ds_slice):
if isinstance(ax_slice, slice):
stop = ax_slice.stop
if stop is None:
stop = ds_shape[i]
out_shape += (len(range(*ax_slice.indices(stop))), )
if isinstance(ax_slice, (list, tuple, np.ndarray)):
if not contains_list:
out_shape += (len(ax_slice), )
contains_list = True
return out_shape
def _extract_list_slice(self, ds_slice):
"""
Optimize and extract list slice request along a single dimension. This
function checks if sequential gid requests are more than one chunk size
apart and then splits them into multiple separate requests (more
efficient to do multipl reads than to read all gids in-between).
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
Extracted array of data from ds
"""
out_slices = []
chunks = self.chunks
sort_idx = []
list_len = None
if chunks:
for i, ax_slice in enumerate(ds_slice):
c = chunks[i]
if isinstance(ax_slice, (list, np.ndarray)):
if not isinstance(ax_slice, np.ndarray):
ax_slice = np.array(ax_slice)
idx = np.argsort(ax_slice)
sort_idx.append(np.argsort(idx))
ax_slice = ax_slice[idx]
# this checks if sequential gid requests are more than one
# chunk size apart and then splits them into multiple
# separate requests
diff = np.diff(ax_slice) > c
if np.any(diff):
pos = np.where(diff)[0] + 1
ax_slice = np.split(ax_slice, pos)
list_len = len(ax_slice)
elif isinstance(ax_slice, slice):
sort_idx.append(slice(None))
out_slices.append(ax_slice)
else:
out_slices = ds_slice
if list_len is not None:
out_shape = self._get_out_arr_shape(ds_slice)
out_slices = self._make_list_slices(out_slices, list_len)
out = np.zeros(out_shape, dtype=self.dtype)
start = 0
for s in zip(*out_slices):
arr_slice, stop = self._get_out_arr_slice(s, start)
out[arr_slice] = self._extract_ds_slice(s)
start = stop
out = out[tuple(sort_idx)]
else:
out = self._extract_ds_slice(ds_slice)
return out
def _extract_multi_list_slice(self, ds_slice, list_len):
"""
Extract ds_slice that has multiple lists
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
list_len : int
List lenght
Returns
-------
out : ndarray
Extracted array of data from ds
"""
zip_slices = self._make_list_slices(ds_slice, list_len)
out_shape = self._get_out_arr_shape(ds_slice)
out = np.zeros(out_shape, dtype=self.dtype)
start = 0
for s in zip(*zip_slices):
arr_slice, stop = self._get_out_arr_slice(s, start)
arr = self._extract_ds_slice(s)
out[arr_slice] = arr
start = stop
return out
def _extract_ds_slice(self, ds_slice):
"""
Extact ds_slice from ds using slices where possible
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
Extracted array of data from ds
"""
slices = ()
idx_slice = ()
for ax_slice in ds_slice:
ax_slice, ax_idx = self._list_to_slice(ax_slice)
slices += (ax_slice,)
if ax_idx is not None:
idx_slice += (ax_idx,)
try:
out = self.ds[slices]
except Exception as e:
msg = (f'Error retrieving data from "{self.ds}" for '
f'slice: "{slices}".')
not_h5_ds = not isinstance(self.ds, h5py.Dataset)
is_hsds_ds = 'h5pyd' in str(type(self.ds))
error_type = isinstance(e, (OSError, IOError))
if not_h5_ds and is_hsds_ds and error_type:
msg += (' Detected OSError/IOError from h5pyd. '
'This is not a rex error and please do not submit '
'a bug report. this is likely due to HSDS server '
'limits, especially if you are using an NREL '
'developer API key. For more details, see: '
'https://nrel.github.io/rex/misc/examples.hsds.html')
raise ResourceRuntimeError(msg) from e
# check to see if idx_slice needs to be applied
if any(s != slice(None) if isinstance(s, slice) else True
for s in idx_slice):
out = out[idx_slice]
return out
def _unscale_data(self, data):
"""
Unscale dataset data
Parameters
----------
data : ndarray
Native dataset array
Returns
-------
data : ndarray
Unscaled dataset array
"""
data = data.astype('float32')
if self.adder != 0:
data *= self.scale_factor
data += self.adder
else:
data /= self.scale_factor
return data
def _get_ds_slice(self, ds_slice):
"""
Get ds_slice from ds as efficiently as possible, unscale if desired
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
Extracted array of data from ds
"""
list_len, multi_list = self._check_slice(ds_slice)
if list_len is not None:
if multi_list:
out = self._extract_multi_list_slice(ds_slice, list_len)
else:
out = self._extract_list_slice(ds_slice)
else:
out = self._extract_ds_slice(ds_slice)
if self._unscale:
out = self._unscale_data(out)
return out
[docs]
class BaseResource(BaseDatasetIterable):
"""
Abstract Base class to handle resource .h5 files
"""
SCALE_ATTR = 'scale_factor'
ADD_ATTR = 'add_offset'
UNIT_ATTR = 'units'
def __init__(self, h5_file, mode='r', unscale=True, str_decode=True,
group=None, hsds=False, hsds_kwargs=None):
"""
Parameters
----------
h5_file : str
Path to .h5 resource file
mode : str, optional
Mode to instantiate h5py.File instance, by default 'r'
unscale : bool, optional
Boolean flag to automatically unscale variables on extraction,
by default True
str_decode : bool, optional
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read,
by default True
group : str, optional
Group within .h5 resource file to open, by default None
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
"""
self.h5_file = h5_file
if hsds:
if mode != 'r':
raise OSError('Cannot write to files accessed vias HSDS!')
import h5pyd
if hsds_kwargs is None:
hsds_kwargs = {}
self._h5 = h5pyd.File(self.h5_file, mode='r', use_cache=False,
**hsds_kwargs)
else:
try:
self._h5 = h5py.File(self.h5_file, mode=mode)
except Exception as e:
msg = ('Could not open file in mode "{}": "{}"'
.format(mode, self.h5_file))
raise OSError(msg) from e
self._group = group
self._unscale = unscale
self._meta = None
self._time_index = None
self._lat_lon = None
self._str_decode = str_decode
self._attrs = None
self._shapes = None
self._chunks = None
self._dtypes = None
def __repr__(self):
msg = "{} for {}".format(self.__class__.__name__, self.h5_file)
return msg
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
if type is not None:
raise
def __len__(self):
return self.h5['time_index'].shape[0]
def __getitem__(self, keys):
ds, ds_slice = parse_keys(keys)
_, ds_name = os.path.split(ds)
if ds_name.startswith('time_index'):
out = self._get_time_index(ds, ds_slice)
elif ds_name.startswith('meta'):
out = self._get_meta(ds, ds_slice)
elif ds_name.startswith('coordinates'):
out = self._get_coords(ds, ds_slice)
elif 'SAM' in ds_name:
site = ds_slice[0]
if isinstance(site, (int, np.integer)):
out = self.get_SAM_df(site) # pylint: disable=E1111
else:
msg = "Can only extract SAM DataFrame for a single site"
raise ResourceRuntimeError(msg)
else:
out = self._get_ds(ds, ds_slice)
return out
def __contains__(self, dset):
return dset in self.datasets
@classmethod
def _get_datasets(cls, h5_obj, group=None):
"""
Search h5 file instance for Datasets
Parameters
----------
h5_obj : h5py.File | h5py.Group
Open h5py File or Group instance to search
Returns
-------
dsets : list
List of datasets in h5_obj
"""
dsets = []
for name in h5_obj:
sub_obj = h5_obj[name]
if isinstance(sub_obj, h5py.Group):
dsets.extend(cls._get_datasets(sub_obj, group=name))
else:
dset_name = name
if group is not None:
dset_name = "{}/{}".format(group, dset_name)
dsets.append(dset_name)
return dsets
@property
def h5(self):
"""
Open h5py File instance. If _group is not None return open Group
Returns
-------
h5 : h5py.File | h5py.Group
"""
h5 = self._h5
if self._group is not None:
h5 = h5[self._group]
return h5
@property
def datasets(self):
"""
Datasets available
Returns
-------
list
"""
return self._get_datasets(self.h5)
@property
def dsets(self):
"""
Datasets available
Returns
-------
list
"""
return self.datasets
@property
def resource_datasets(self):
"""
Available resource datasets
Returns
-------
list
"""
dsets = [ds for ds in self.datasets
if ds not in ['meta', 'time_index', 'coordinates']]
return dsets
@property
def res_dsets(self):
"""
Available resource datasets
Returns
-------
list
"""
return self.resource_datasets
@property
def groups(self):
"""
Groups available
Returns
-------
groups : list
List of groups
"""
groups = []
for name in self.h5:
if isinstance(self.h5[name], h5py.Group):
groups.append(name)
return groups
@property
def shape(self):
"""
Resource shape (timesteps, sites)
shape = (len(time_index), len(meta))
Returns
-------
shape : tuple
"""
shape = (self.h5['time_index'].shape[0], self.h5['meta'].shape[0])
return shape
@property
def meta(self):
"""
Resource meta data DataFrame
Returns
-------
meta : pandas.DataFrame
"""
if self._meta is None:
if 'meta' in self.h5:
self._meta = self._get_meta('meta', slice(None))
else:
raise ResourceKeyError("'meta' is not a valid dataset")
return self._meta
@property
def time_index(self):
"""
Resource DatetimeIndex
Returns
-------
time_index : pandas.DatetimeIndex
"""
if self._time_index is None:
if 'time_index' in self.h5:
self._time_index = self._get_time_index('time_index',
slice(None))
else:
raise ResourceKeyError("'time_index' is not a valid dataset!")
return self._time_index
@property
def coordinates(self):
"""
Coordinates: (lat, lon) pairs
Returns
-------
lat_lon : ndarray
"""
return self.lat_lon
@property
def lat_lon(self):
"""
Extract (latitude, longitude) pairs
Returns
-------
lat_lon : ndarray
"""
if self._lat_lon is None:
if 'coordinates' in self:
self._lat_lon = self._get_coords('coordinates', slice(None))
else:
lat_lon_cols = get_lat_lon_cols(self.meta)
self._lat_lon = self.meta[lat_lon_cols].values
return self._lat_lon
@property
def data_version(self):
"""
Get the version attribute of the data. None if not available.
Returns
-------
version : str | None
"""
return self.global_attrs.get('version', None)
@property
def global_attrs(self):
"""
Global (file) attributes
Returns
-------
global_attrs : dict
"""
return dict(self.h5.attrs)
@property
def attrs(self):
"""
Dictionary of all dataset attributes
Returns
-------
attrs : dict
"""
if self._attrs is None:
self._attrs = {}
for dset in self.datasets:
self._attrs[dset] = dict(self.h5[dset].attrs)
return self._attrs
@property
def shapes(self):
"""
Dictionary of all dataset shapes
Returns
-------
shapes : dict
"""
if self._shapes is None:
self._shapes = {}
for dset in self.datasets:
self._shapes[dset] = self.h5[dset].shape
return self._shapes
@property
def dtypes(self):
"""
Dictionary of all dataset dtypes
Returns
-------
dtypes : dict
"""
if self._dtypes is None:
self._dtypes = {}
for dset in self.datasets:
self._dtypes[dset] = self.h5[dset].dtype
return self._dtypes
@property
def chunks(self):
"""
Dictionary of all dataset chunk sizes
Returns
-------
chunks : dict
"""
if self._chunks is None:
self._chunks = {}
for dset in self.datasets:
self._chunks[dset] = self._check_chunks(self.h5[dset].chunks)
return self._chunks
@property
def adders(self):
"""
Dictionary of all dataset add offset factors
Returns
-------
adders : dict
"""
return self._parse_attr_names(self.ADD_ATTR, 0)
@property
def scale_factors(self):
"""
Dictionary of all dataset scale factors
Returns
-------
scale_factors : dict
"""
return self._parse_attr_names(self.SCALE_ATTR, 1)
@property
def units(self):
"""
Dictionary of all dataset units
Returns
-------
units : dict
"""
return self._parse_attr_names(self.UNIT_ATTR, None)
def _parse_attr_names(self, attr_names, default):
"""Retrieve an attribute from all dataset attributes.
Parameters
----------
attr_names : str | list
Single or prioritized list of attribute names to retrieve, e.g.
"scale_factor" or ["scale_factor", "psm_scale_factor"]
default : None | int | float
Default value if attr_names not found in any given dataset.
Returns
-------
out : dict
Dictionary mapping datasets (keys) to attribute values (values),
e.g. if attr_names="scale_factor", out would be:
{"windspeed_10m": 10, "misc": 1}
"""
if isinstance(attr_names, str):
attr_names = [attr_names]
out = {}
for dset, attrs in self.attrs.items():
out[dset] = default
for name in attr_names:
if name in attrs:
out[dset] = attrs[name]
break
return out
@staticmethod
def _check_chunks(chunks):
"""
Check to see if chunks is an HSDS dictionary, if so convert to a tuple
Parameters
----------
chunks : tuple | dict | None
tuple of chunk size, None, or HSDS chunk dictionary
Returns
-------
chunks : tuple
Tuple of chunk size along all axes
"""
if isinstance(chunks, dict):
chunks = tuple(chunks.get('dims', None))
return chunks
[docs]
@staticmethod
def df_str_decode(df):
"""Decode a dataframe with byte string columns into ordinary str cols.
Parameters
----------
df : pd.DataFrame
Dataframe with some columns being byte strings.
Returns
-------
df : pd.DataFrame
DataFrame with str columns instead of byte str columns.
"""
for col in df:
if (np.issubdtype(df[col].dtype, np.object_)
and isinstance(df[col].values[0], bytes)):
df[col] = df[col].copy().str.decode('utf-8', 'ignore')
return df
[docs]
def open_dataset(self, ds_name):
"""
Open resource dataset
Parameters
----------
ds_name : str
Dataset name to open
Returns
-------
ds : ResourceDataset
Resource for open resource dataset
"""
if ds_name not in self.datasets:
raise ResourceKeyError('{} not in {}'
.format(ds_name, self.datasets))
ds = ResourceDataset(self.h5[ds_name], scale_attr=self.SCALE_ATTR,
add_attr=self.ADD_ATTR, unscale=self._unscale)
return ds
[docs]
def get_attrs(self, dset=None):
"""
Get h5 attributes either from file or dataset
Parameters
----------
dset : str
Dataset to get attributes for, if None get file (global) attributes
Returns
-------
attrs : dict
Dataset or file attributes
"""
if dset is None:
attrs = dict(self.h5.attrs)
else:
attrs = dict(self.h5[dset].attrs)
return attrs
[docs]
def get_dset_properties(self, dset):
"""
Get dataset properties (shape, dtype, chunks)
Parameters
----------
dset : str
Dataset to get scale factor for
Returns
-------
shape : tuple
Dataset array shape
dtype : str
Dataset array dtype
chunks : tuple
Dataset chunk size
"""
ds = self.h5[dset]
shape, dtype, chunks = ds.shape, ds.dtype, ds.chunks
return shape, dtype, self._check_chunks(chunks)
[docs]
def get_scale_factor(self, dset):
"""
Get dataset scale factor
Parameters
----------
dset : str
Dataset to get scale factor for
Returns
-------
float
Dataset scale factor, used to unscale int values to floats
"""
return self.scale_factors[dset]
# pylint: disable=redefined-argument-from-local
[docs]
def get_units(self, dset):
"""
Get dataset units
Parameters
----------
dset : str
Dataset to get units for
Returns
-------
str
Dataset units, None if not defined
"""
if dset not in self:
name = dset.split('_')[0]
for dset in self.resource_datasets:
if dset.startswith(name):
break
return self.units[dset]
def _get_time_index(self, ds_name, ds_slice):
"""
Extract and convert time_index to pandas Datetime Index
Parameters
----------
ds_name : str
Dataset to extract time_index from
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from
time_index
Returns
-------
time_index : pandas.DatetimeIndex
Vector of datetime stamps
"""
ds_slice = parse_slice(ds_slice)
time_index = self.h5[ds_name]
time_index = ResourceDataset.extract(time_index, ds_slice[0],
unscale=False)
try:
datetime_index = pd.to_datetime(time_index.astype(str))
except (pd.errors.OutOfBoundsDatetime, dateutil.parser.ParserError):
return time_index
return check_tz(datetime_index)
def _get_meta(self, ds_name, ds_slice):
"""
Extract and convert meta to a pandas DataFrame
Parameters
----------
ds_name : str
Dataset to extract meta from
ds_slice : tuple
Tuple of (int, slice, list, ndarray, str) of what sites and columns
to extract from meta
Returns
-------
meta : pandas.Dataframe
Dataframe of location meta data
"""
ds_slice = parse_slice(ds_slice)
sites = ds_slice[0]
if isinstance(sites, (int, np.integer)):
sites = slice(sites, sites + 1)
meta = self.h5[ds_name]
meta = ResourceDataset.extract(meta, sites, unscale=False)
if isinstance(sites, slice):
stop = sites.stop
if stop is None:
stop = len(meta)
sites = list(range(*sites.indices(stop)))
meta = pd.DataFrame(meta, index=sites)
if 'gid' not in meta:
meta.index.name = 'gid'
if self._str_decode:
meta = self.df_str_decode(meta)
if len(ds_slice) == 2:
meta = meta[ds_slice[1]]
return meta
def _get_coords(self, ds_name, ds_slice):
"""
Extract coordinates (lat, lon) pairs
Parameters
----------
ds_name : str
Dataset to extract coordinates from
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from
coordinates, each arg is for a sequential axis
Returns
-------
coords : ndarray
Array of (lat, lon) pairs for each site in meta
"""
ds_slice = parse_slice(ds_slice)
coords = self.h5[ds_name]
coords = ResourceDataset.extract(coords, ds_slice[0],
unscale=False)
return coords
# pylint: disable=unused-argument
[docs]
def get_SAM_df(self, site):
"""
Placeholder for get_SAM_df method that it resource specific
Parameters
----------
site : int
Site to extract SAM DataFrame for
"""
msg = ('Method to retrieve SAM dataframe not implemented for vanilla '
'Resource handler. Use an NSRDB or WTK handler instead.')
raise NotImplementedError(msg)
def _get_ds(self, ds_name, ds_slice):
"""
Extract data from given dataset
Parameters
----------
ds_name : str
Variable dataset to be extracted
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
ndarray of variable timeseries data
If unscale, returned in native units else in scaled units
"""
if ds_name not in self.datasets:
raise ResourceKeyError('{} not in {}'
.format(ds_name, self.datasets))
ds = self.h5[ds_name]
ds_slice = parse_slice(ds_slice)
if len(ds_slice) > len(ds.shape):
return self._get_ds_with_repeated_values(ds, ds_name, ds_slice)
return ResourceDataset.extract(ds, ds_slice,
scale_attr=self.SCALE_ATTR,
add_attr=self.ADD_ATTR,
unscale=self._unscale)
def _get_ds_with_repeated_values(self, ds, ds_name, ds_slice):
"""
Extract 1D data using 2D slice by repeating the 1D data along
the spatial or temporal dimension
Parameters
----------
ds : h5py.dataset
Open .h5 dataset instance to extract data from
ds_name : str
Variable dataset to be extracted
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
ndarray of variable timeseries data
If unscale, returned in native units else in scaled units
"""
ti_shape = self.shapes.get('time_index')
meta_shape = self.shapes.get('meta')
if ti_shape == meta_shape:
msg = ("Attempting to use a 2D slice on a 1D dataset when the "
"meta and time index have the same shape - unable to "
"disambiguate the slice dimensions. Please either update "
"the length of the meta and/or index, set the shape of "
"{!r} to be 2-dimensional (current shape: {!r}), or use a "
"1-dimensional slice.".format(ds_name, ds.shape))
raise ResourceRuntimeError(msg)
if ds.shape == ti_shape:
return self._get_ds_with_spatial_repeat(ds, ds_name, ds_slice)
if ds.shape == meta_shape:
return self._get_ds_with_temporal_repeat(ds, ds_name, ds_slice)
msg = ("Attempting to use a 2D slice on a 1D dataset ({0!r}) when "
"the shape of the dataset {1!r} does not match the shape "
"of the meta {2!r})or the time index {3!r}. Please either "
"update the length of ({0!r}) to match either the meta or "
"index, or use a 1-dimensional slice."
.format(ds_name, ds.shape, meta_shape, ti_shape))
raise ResourceRuntimeError(msg)
def _get_ds_with_spatial_repeat(self, ds, ds_name, ds_slice):
"""
Extract 1D data using 2D slice by repeating the 1D data along
the spatial dimension
Parameters
----------
ds : h5py.dataset
Open .h5 dataset instance to extract data from
ds_name : str
Variable dataset to be extracted
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
ndarray of variable timeseries data
If unscale, returned in native units else in scaled units
"""
msg = ("Attempting to use a 2D slice on a 1D dataset ({!r}). 1D "
"dataset values will be repeated along the spatial dimension"
.format(ds_name))
warn(msg)
out = ResourceDataset.extract(ds, ds_slice[:1],
scale_attr=self.SCALE_ATTR,
add_attr=self.ADD_ATTR,
unscale=self._unscale)
if not isinstance(out, np.ndarray):
out *= np.ones(self.shapes['meta'], dtype=np.float32)
out = out[ds_slice[1]]
else:
out = np.repeat(out[:, None], self.shapes['meta'][0], axis=1)
out = out[:, ds_slice[1]]
return out.astype(np.float32)
def _get_ds_with_temporal_repeat(self, ds, ds_name, ds_slice):
"""
Extract 1D data using 2D slice by repeating the 1D data along
the temporal dimension
Parameters
----------
ds : h5py.dataset
Open .h5 dataset instance to extract data from
ds_name : str
Variable dataset to be extracted
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
ndarray of variable timeseries data
If unscale, returned in native units else in scaled units
"""
msg = ("Attempting to use a 2D slice on a 1D dataset ({!r}). 1D "
"dataset values will be repeated along the temporal dimension"
.format(ds_name))
warn(msg)
out = ResourceDataset.extract(ds, ds_slice[1:],
scale_attr=self.SCALE_ATTR,
add_attr=self.ADD_ATTR,
unscale=self._unscale)
if not isinstance(out, np.ndarray):
out *= np.ones(self.shapes['time_index'], dtype=np.float32)
else:
out = np.ones((self.shapes['time_index'][0], len(out))) * out
return out[ds_slice[0]].astype(np.float32)
[docs]
def close(self):
"""
Close h5 instance
"""
self._h5.close()
@staticmethod
def _preload_SAM(res, sites, tech, time_index_step=None, means=False):
"""
Placeholder method to pre-load project_points for SAM
Parameters
----------
res : rex.Resource
rex Resource handler or similar (NSRDB, WindResource,
MultiFileResource, etc...)
sites : list
List of sites to be provided to SAM
(sites is synonymous with gids aka spatial indices)
tech : str
Technology to be run by SAM
time_index_step: int, optional
Step size for time_index, used to reduce temporal resolution,
by default None
means : bool, optional
Boolean flag to compute mean resource when res_array is set,
by default False
"""
time_slice = slice(None, None, time_index_step)
SAM_res = SAMResource(sites, tech, res['time_index', time_slice],
means=means)
sites = SAM_res.sites_slice
SAM_res['meta'] = res['meta', sites]
SAM_res.load_rex_resource(res, SAM_res.var_list, time_slice, sites)
return SAM_res
[docs]
@classmethod
def preload_SAM(cls, h5_file, sites, tech, unscale=True, str_decode=True,
group=None, hsds=False, hsds_kwargs=None,
time_index_step=None, means=False):
"""
Pre-load project_points for SAM
Parameters
----------
h5_file : str
h5_file to extract resource from
sites : list
List of sites to be provided to SAM
(sites is synonymous with gids aka spatial indices)
tech : str
Technology to be run by SAM
unscale : bool
Boolean flag to automatically unscale variables on extraction
str_decode : bool
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read.
group : str
Group within .h5 resource file to open
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
time_index_step: int, optional
Step size for time_index, used to reduce temporal resolution,
by default None
means : bool, optional
Boolean flag to compute mean resource when res_array is set,
by default False
Returns
-------
SAM_res : SAMResource
Instance of SAMResource pre-loaded with Solar resource for sites
in project_points
"""
kwargs = {"unscale": unscale, "hsds": hsds, 'hsds_kwargs': hsds_kwargs,
"str_decode": str_decode, "group": group}
with cls(h5_file, **kwargs) as res:
SAM_res = res._preload_SAM(res, sites, tech,
time_index_step=time_index_step,
means=means)
return SAM_res
[docs]
class Resource(BaseResource):
"""
Base class to handle resource .h5 files
Examples
--------
Extracting the resource's Datetime Index
>>> file = '$TESTDATADIR/nsrdb/ri_100_nsrdb_2012.h5'
>>> with Resource(file) as res:
>>> ti = res.time_index
>>>
>>> ti
DatetimeIndex(['2012-01-01 00:00:00', '2012-01-01 00:30:00',
'2012-01-01 01:00:00', '2012-01-01 01:30:00',
'2012-01-01 02:00:00', '2012-01-01 02:30:00',
'2012-01-01 03:00:00', '2012-01-01 03:30:00',
'2012-01-01 04:00:00', '2012-01-01 04:30:00',
...
'2012-12-31 19:00:00', '2012-12-31 19:30:00',
'2012-12-31 20:00:00', '2012-12-31 20:30:00',
'2012-12-31 21:00:00', '2012-12-31 21:30:00',
'2012-12-31 22:00:00', '2012-12-31 22:30:00',
'2012-12-31 23:00:00', '2012-12-31 23:30:00'],
dtype='datetime64[ns]', length=17568, freq=None)
Efficient slicing of the Datetime Index
>>> with Resource(file) as res:
>>> ti = res['time_index', 1]
>>>
>>> ti
2012-01-01 00:30:00
>>> with Resource(file) as res:
>>> ti = res['time_index', :10]
>>>
>>> ti
DatetimeIndex(['2012-01-01 00:00:00', '2012-01-01 00:30:00',
'2012-01-01 01:00:00', '2012-01-01 01:30:00',
'2012-01-01 02:00:00', '2012-01-01 02:30:00',
'2012-01-01 03:00:00', '2012-01-01 03:30:00',
'2012-01-01 04:00:00', '2012-01-01 04:30:00'],
dtype='datetime64[ns]', freq=None)
>>> with Resource(file) as res:
>>> ti = res['time_index', [1, 2, 4, 8, 9]
>>>
>>> ti
DatetimeIndex(['2012-01-01 00:30:00', '2012-01-01 01:00:00',
'2012-01-01 02:00:00', '2012-01-01 04:00:00',
'2012-01-01 04:30:00'],
dtype='datetime64[ns]', freq=None)
Extracting resource's site metadata
>>> with Resource(file) as res:
>>> meta = res.meta
>>>
>>> meta
latitude longitude elevation timezone country ...
0 41.29 -71.86 0.000000 -5 None ...
1 41.29 -71.82 0.000000 -5 None ...
2 41.25 -71.82 0.000000 -5 None ...
3 41.33 -71.82 15.263158 -5 United States ...
4 41.37 -71.82 25.360000 -5 United States ...
.. ... ... ... ... ... ...
95 41.25 -71.66 0.000000 -5 None ...
96 41.89 -71.66 153.720000 -5 United States ...
97 41.45 -71.66 35.440000 -5 United States ...
98 41.61 -71.66 140.200000 -5 United States ...
99 41.41 -71.66 35.160000 -5 United States ...
[100 rows x 10 columns]
Efficient slicing of the metadata
>>> with Resource(file) as res:
>>> meta = res['meta', 1]
>>>
>>> meta
latitude longitude elevation timezone country state county urban ...
1 41.29 -71.82 0.0 -5 None None None None ...
>>> with Resource(file) as res:
>>> meta = res['meta', :5]
>>>
>>> meta
latitude longitude elevation timezone country ...
0 41.29 -71.86 0.000000 -5 None ...
1 41.29 -71.82 0.000000 -5 None ...
2 41.25 -71.82 0.000000 -5 None ...
3 41.33 -71.82 15.263158 -5 United States ...
4 41.37 -71.82 25.360000 -5 United States ...
>>> with Resource(file) as res:
>>> tz = res['meta', :, 'timezone']
>>>
>>> tz
0 -5
1 -5
2 -5
3 -5
4 -5
..
95 -5
96 -5
97 -5
98 -5
99 -5
Name: timezone, Length: 100, dtype: int64
>>> with Resource(file) as res:
>>> lat_lon = res['meta', :, ['latitude', 'longitude']]
>>>
>>> lat_lon
latitude longitude
0 41.29 -71.86
1 41.29 -71.82
2 41.25 -71.82
3 41.33 -71.82
4 41.37 -71.82
.. ... ...
95 41.25 -71.66
96 41.89 -71.66
97 41.45 -71.66
98 41.61 -71.66
99 41.41 -71.66
[100 rows x 2 columns]
Extracting resource variables (datasets)
>>> with Resource(file) as res:
>>> wspd = res['wind_speed']
>>>
>>> wspd
[[12. 12. 12. ... 12. 12. 12.]
[12. 12. 12. ... 12. 12. 12.]
[12. 12. 12. ... 12. 12. 12.]
...
[14. 14. 14. ... 14. 14. 14.]
[15. 15. 15. ... 15. 15. 15.]
[15. 15. 15. ... 15. 15. 15.]]
Efficient slicing of variables
>>> with Resource(file) as res:
>>> wspd = res['wind_speed', :2]
>>>
>>> wspd
[[12. 12. 12. 12. 12. 12. 53. 53. 53. 53. 53. 12. 53. 1. 1. 12. 12. 12.
1. 1. 12. 53. 53. 53. 12. 12. 12. 12. 12. 1. 12. 12. 1. 12. 12. 53.
12. 53. 1. 12. 1. 53. 53. 12. 12. 12. 12. 1. 1. 1. 12. 12. 1. 1.
12. 12. 53. 53. 53. 12. 12. 53. 53. 12. 12. 12. 12. 12. 12. 1. 53. 1.
53. 12. 12. 53. 53. 1. 1. 1. 53. 12. 1. 1. 53. 53. 53. 12. 12. 12.
12. 12. 12. 12. 1. 12. 1. 12. 12. 12.]
[12. 12. 12. 12. 12. 12. 53. 53. 53. 53. 53. 12. 53. 1. 1. 12. 12. 12.
1. 1. 12. 53. 53. 53. 12. 12. 12. 12. 12. 1. 12. 12. 1. 12. 12. 53.
12. 53. 1. 12. 1. 53. 53. 12. 12. 12. 12. 1. 1. 1. 12. 12. 1. 1.
12. 12. 53. 53. 53. 12. 12. 53. 53. 12. 12. 12. 12. 12. 12. 1. 53. 1.
53. 12. 12. 53. 53. 1. 1. 1. 53. 12. 1. 1. 53. 53. 53. 12. 12. 12.
12. 12. 12. 12. 1. 12. 1. 12. 12. 12.]]
>>> with Resource(file) as res:
>>> wspd = res['wind_speed', :, [2, 3]]
>>>
>>> wspd
[[12. 12.]
[12. 12.]
[12. 12.]
...
[14. 14.]
[15. 15.]
[15. 15.]]
"""
SCALE_ATTR = 'scale_factor'
ADD_ATTR = 'add_offset'
UNIT_ATTR = 'units'
def __init__(self, h5_file, unscale=True, str_decode=True, group=None,
hsds=False, hsds_kwargs=None):
"""
Parameters
----------
h5_file : str
Path to .h5 resource file
unscale : bool, optional
Boolean flag to automatically unscale variables on extraction,
by default True
str_decode : bool, optional
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read,
by default True
group : str, optional
Group within .h5 resource file to open, by default None
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
"""
super().__init__(h5_file, unscale=unscale, str_decode=str_decode,
group=group, mode='r', hsds=hsds,
hsds_kwargs=hsds_kwargs)