# -*- coding: utf-8 -*-
"""
Classes to handle resource data stored over multiple files
"""
import os
from glob import glob
from itertools import chain
from fnmatch import fnmatch
import numpy as np
import pandas as pd
from rex.renewable_resource import (
NSRDB,
SolarResource,
WaveResource,
WindResource,
)
from rex.resource import Resource, BaseDatasetIterable
from rex.utilities.exceptions import FileInputError
from rex.utilities.parse_keys import parse_keys, parse_slice
[docs]
class MultiTimeH5:
"""
Class to handle h5 Resources stored over multiple temporal files
"""
def __init__(self, h5_path, res_cls=Resource, hsds=False, hsds_kwargs=None,
**res_cls_kwargs):
"""
Parameters
----------
h5_path : str | list
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes. Can also be
an explicit list of multi time files, which themselves can
contain * wildcards.
res_cls : obj
Resource class to use to open and access resource data
hsds : bool
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
res_cls_kwargs : dict, optional
Kwargs for `res_cls`
"""
self.h5_path = h5_path
self._file_paths = self._get_file_paths(h5_path, hsds=hsds,
hsds_kwargs=hsds_kwargs)
res_cls_kwargs.update({'hsds': hsds})
self._h5_map = self._map_file_instances(self._file_paths,
res_cls=res_cls,
**res_cls_kwargs)
self._datasets = None
self._shape = None
self._time_index = None
self._time_slice_map = []
def __repr__(self):
msg = ("{} for {}:\n Contains data from {} files"
.format(self.__class__.__name__, self.h5_path, len(self)))
return msg
def __getitem__(self, file):
fn_fp_map = {os.path.basename(fp): fp for fp in self._file_paths}
if file in self._h5_map['fp'].values:
iloc = np.where(self._h5_map['fp'] == file)[0][0]
h5 = self._h5_map.at[iloc, 'h5']
elif file in fn_fp_map:
iloc = np.where(self._h5_map['fp'] == fn_fp_map[file])[0][0]
h5 = self._h5_map.at[iloc, 'h5']
else:
raise ValueError('{} is invalid, must be one of: {}'
.format(file, self._file_paths))
return h5
@property
def attrs(self):
"""
Global .h5 file attributes sourced from first .h5 file
Returns
-------
attrs : dict
"""
attrs = dict(self.h5.attrs)
return attrs
@property
def files(self):
"""
Available file paths
Returns
-------
list
"""
return sorted(self._file_paths)
@property
def h5_files(self):
"""
.h5 files data is being sourced from
Returns
-------
list
"""
return sorted(self._h5_map['fp'])
@property
def h5(self):
"""
open h5 file handler for a single .h5 file
Returns
-------
h5py.File
"""
return self._h5_map['h5'].values[0]
@property
def datasets(self):
"""
Available datasets
Returns
-------
list
"""
if self._datasets is None:
all_dsets = self._h5_map['dsets'].values.tolist()
dsets = [d for sub in all_dsets for d in sub]
self._datasets = list(set(dsets))
return self._datasets
@property
def resource_datasets(self):
"""
Available resource datasets
Returns
-------
list
"""
res_dsets = [ds for ds in self.datasets
if ds not in ['meta', 'time_index', 'coordinates']]
return res_dsets
@property
def shape(self):
"""
Dataset shape (time, sites)
Returns
-------
tuple
"""
if self._shape is None:
self._shape = (len(self.time_index), self.h5.shape[1])
return self._shape
@property
def time_index(self):
"""
Multi-year datetime index
Returns
-------
pandas.DatatimeIndex
"""
if self._time_index is None:
time_slice_map = []
for _, row in self._h5_map.iterrows():
h5 = row['h5']
fp = row['fp']
ti = h5.time_index
time_slice_map.append(np.full(len(ti), os.path.basename(fp)))
if self._time_index is None:
self._time_index = ti
else:
self._time_index = self._time_index.append(ti)
if len(self._time_index) != len(np.unique(self._time_index)):
unique, duplicates = np.unique(self._time_index,
return_counts=True)
duplicates = np.where(duplicates > 1)[0]
duplicates = unique[duplicates]
msg = ('The combined time_index has {} duplicate values:\n{}'
.format(len(duplicates), duplicates))
raise RuntimeError(msg)
self._time_slice_map = np.concatenate(time_slice_map, axis=0)
return self._time_index
@staticmethod
def _get_hsds_file_paths(h5_path, hsds_kwargs=None):
"""
Get a list of h5 filepaths matching the h5_path specification from HSDS
Parameters
----------
h5_path : str
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes.
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
Returns
-------
file_paths : list
List of filepaths for this handler to handle.
"""
import h5pyd
if hsds_kwargs is None:
hsds_kwargs = {}
hsds_dir = os.path.dirname(h5_path)
fn = os.path.basename(h5_path)
if '*' in hsds_dir:
msg = ('HSDS path specifications cannot handle wildcards in the '
'directory name! The directory must be explicit but the '
'filename can have wildcards. This HSDS h5_path input '
'cannot be used: {}'.format(h5_path))
raise FileNotFoundError(msg)
if not fn:
msg = ('h5_path must be a unix shell style pattern with '
'wildcard * in order to find files, but received '
'directory specification: {}'.format(h5_path))
raise FileInputError(msg)
with h5pyd.Folder(hsds_dir + '/', **hsds_kwargs) as f:
file_paths = [f'{hsds_dir}/{fn}' for fn in f
if fnmatch(f'{hsds_dir}/{fn}', h5_path)]
return file_paths
@classmethod
def _get_file_paths(cls, h5_path, hsds=False, hsds_kwargs=None):
"""
Get a file list based on the h5_path specification.
Parameters
----------
h5_path : str | list
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes. Can also be
an explicit list of multi time files, which themselves can
contain * wildcards.
hsds : bool
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
Returns
-------
file_paths : list
List of filepaths for this handler to handle.
"""
if hsds:
file_paths = cls._get_hsds_file_paths(h5_path,
hsds_kwargs=hsds_kwargs)
elif isinstance(h5_path, (list, tuple)):
file_paths = list(chain.from_iterable(glob(fp) for fp in h5_path))
for fp in file_paths:
assert os.path.exists(fp), 'Does not exist: {}'.format(fp)
elif os.path.isdir(h5_path):
msg = ('h5_path must be a unix shell style pattern with '
'wildcard * in order to find files, but received '
'directory specification: {}'.format(h5_path))
raise FileInputError(msg)
elif isinstance(h5_path, str):
file_paths = glob(h5_path)
if not any(file_paths):
msg = ('Could not find any file paths with pattern: {}'
.format(h5_path))
raise FileInputError(msg)
return file_paths
@staticmethod
def _map_file_instances(file_paths, res_cls=Resource, **res_cls_kwargs):
"""
Open all .h5 files and map the open h5py instances to the
associated file paths
Parameters
----------
file_paths : list
List of filepaths for this handler to handle.
Returns
-------
h5_map : pd.DataFrame
DataFrame mapping file paths to open resource instances and
datasets per file (columns: fp, h5, and dsets)
"""
h5_map = pd.DataFrame({'fp': file_paths, 'h5': None,
't0': None, 't1': None})
for i, f_path in enumerate(file_paths):
h5_map.at[i, 'h5'] = res_cls(f_path, **res_cls_kwargs)
h5_map.at[i, 't0'] = h5_map.at[i, 'h5'].time_index.values[0]
h5_map.at[i, 't1'] = h5_map.at[i, 'h5'].time_index.values[1]
h5_map['dsets'] = [h5.dsets for h5 in h5_map['h5'].values]
h5_map = h5_map.sort_values('t0').reset_index(drop=True)
return h5_map
@staticmethod
def _check_time_slice(time_slice):
"""
Check to see if time positions can be represented as a slice
Parameters
----------
time_slice : ndarray | list
List of temporal positions
Returns
-------
time_slice : ndarray | list | slice
Slice covering range of positions to extract if possible
"""
s = time_slice[0]
e = time_slice[-1] + 1
if (e - s) == len(time_slice):
time_slice = slice(s, e, None)
return time_slice
def _map_time_slice(self, time_slice):
"""
Map timeslices to files
Parameters
----------
time_slice : int | list | slice
tuple describing slice of dataset array to extract
Returns
-------
file_times : dict
Dictionary mapping files to the time_slices to extract
"""
time_index = self.time_index[time_slice]
files = self._time_slice_map[time_slice]
file_times = {}
for file in np.unique(files):
ti = self[file].time_index
file_slice = np.where(ti.isin(time_index))[0]
file_slice = self._check_time_slice(file_slice)
file_times[file] = file_slice
return file_times
def _get_ds(self, ds_name, ds_slice):
"""
Extract data from given dataset
Parameters
----------
ds_name : str
Variable dataset to be extracted
ds_slice : int | list | slice
tuple describing slice of dataset array to extract
Returns
-------
out : ndarray
ndarray of variable timeseries data
If unscale, returned in native units else in scaled units
"""
ds_slice = parse_slice(ds_slice)
out = []
time_slice = ds_slice[0]
if isinstance(time_slice, (int, np.integer)):
time_step = self.time_index[time_slice]
file = self._time_slice_map[time_slice]
time_index = self[file].time_index
time_slice = np.where(time_step == time_index)[0][0]
file_slice = (time_slice, ) + ds_slice[1:]
out = self[file]._get_ds(ds_name, file_slice)
else:
file_times = self._map_time_slice(ds_slice[0])
for file, time_slice in file_times.items():
file_slice = (time_slice, ) + ds_slice[1:]
out.append(self[file]._get_ds(ds_name, file_slice))
out = np.concatenate(out, axis=0)
return out
[docs]
def close(self):
"""
Close all h5py.File instances
"""
for f in self._h5_map['h5']:
f.close()
[docs]
class MultiTimeResource(BaseDatasetIterable):
"""
Class to handle resource data stored temporally accross multiple
.h5 files
Examples
--------
Extracting the resource's Datetime Index
>>> path = '$TESTDATADIR/nsrdb/ri_100_nsrdb_*.h5'
>>> with MultiTimeResource(path) as res:
>>> ti = res.time_index
>>>
>>> ti
DatetimeIndex(['2012-01-01 00:00:00', '2012-01-01 00:30:00',
'2012-01-01 01:00:00', '2012-01-01 01:30:00',
'2012-01-01 02:00:00', '2012-01-01 02:30:00',
'2012-01-01 03:00:00', '2012-01-01 03:30:00',
'2012-01-01 04:00:00', '2012-01-01 04:30:00',
...
'2013-12-31 19:00:00', '2013-12-31 19:30:00',
'2013-12-31 20:00:00', '2013-12-31 20:30:00',
'2013-12-31 21:00:00', '2013-12-31 21:30:00',
'2013-12-31 22:00:00', '2013-12-31 22:30:00',
'2013-12-31 23:00:00', '2013-12-31 23:30:00'],
dtype='datetime64[ns]', length=35088, freq=None)
NOTE: time_index covers data from 2012 and 2013
>>> with MultiTimeResource(path) as res:
>>> print(res.h5_files)
['/Users/mrossol/Git_Repos/rex/tests/data/nsrdb/ri_100_nsrdb_2012.h5',
'/Users/mrossol/Git_Repos/rex/tests/data/nsrdb/ri_100_nsrdb_2013.h5']
Data slicing works the same as with "Resource" except axis 0 now covers
2012 and 2013
>>> with MultiTimeResource(path) as res:
>>> temperature = res['air_temperature']
>>>
>>> temperature
[[ 4. 5. 5. ... 4. 3. 4.]
[ 4. 4. 5. ... 4. 3. 4.]
[ 4. 4. 5. ... 4. 3. 4.]
...
[-1. -1. 0. ... -2. -3. -2.]
[-1. -1. 0. ... -2. -3. -2.]
[-1. -1. 0. ... -2. -3. -2.]]
>>> temperature.shape
(35088, 100)
>>> with MultiTimeResource(path) as res:
>>> temperature = res['air_temperature', ::100] # every 100th timestep
>>>
>>> temperature
[[ 4. 5. 5. ... 4. 3. 4.]
[ 1. 1. 2. ... 0. 0. 1.]
[-2. -1. -1. ... -2. -4. -2.]
...
[-3. -2. -2. ... -3. -4. -3.]
[ 0. 0. 1. ... 0. -1. 0.]
[ 3. 3. 3. ... 2. 2. 3.]]
>>> temperature.shape
(351, 100)
"""
def __init__(self, h5_path, unscale=True, str_decode=True,
res_cls=Resource, hsds=False, hsds_kwargs=None):
"""
Parameters
----------
h5_path : str | list
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes. Can also be
an explicit list of multi time files, which themselves can
contain * wildcards.
unscale : bool
Boolean flag to automatically unscale variables on extraction
str_decode : bool
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read.
res_cls : obj
Resource handler to us to open individual .h5 files
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
"""
self.h5_path = h5_path
self._time_index = None
# Map variables to their .h5 files
cls_kwargs = {'unscale': unscale, 'str_decode': str_decode,
'hsds': hsds, 'hsds_kwargs': hsds_kwargs}
self._h5 = MultiTimeH5(self.h5_path, res_cls=res_cls, **cls_kwargs)
self.h5_files = self._h5.h5_files
self.h5_file = self.h5_files[0]
def __repr__(self):
msg = "{} for {}".format(self.__class__.__name__, self.h5_path)
return msg
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
if type is not None:
raise
def __len__(self):
return len(self.h5.time_index)
def __getitem__(self, keys):
ds, ds_slice = parse_keys(keys)
if ds.endswith('time_index'):
out = self.h5.h5._get_time_index(ds_slice)
elif ds.endswith('meta'):
out = self.h5.h5._get_meta(ds, ds_slice)
elif ds.endswith('coordinates'):
out = self.h5.h5._get_coords(ds, ds_slice)
else:
out = self.h5._get_ds(ds, ds_slice)
return out
def __contains__(self, dset):
return dset in self.datasets
@property
def h5(self):
"""
Open class instance that handles all .h5 files that data is to
be extracted from
Returns
-------
h5 : MultiTimeH5 | MultiYearH5
"""
return self._h5
@property
def datasets(self):
"""
Datasets available
Returns
-------
list
"""
return self.h5.datasets
@property
def dsets(self):
"""
Datasets available
Returns
-------
list
"""
return self.datasets
@property
def resource_datasets(self):
"""
Available resource datasets
Returns
-------
list
"""
return self.h5.resource_datasets
@property
def res_dsets(self):
"""
Available resource datasets
Returns
-------
list
"""
return self.resource_datasets
@property
def shape(self):
"""
Resource shape (timesteps, sites)
shape = (len(time_index), len(meta))
Returns
-------
shape : tuple
"""
return self.h5.shape
@property
def meta(self):
"""
Resource meta data DataFrame
Returns
-------
meta : pandas.DataFrame
"""
return self.h5.h5.meta
@property
def time_index(self):
"""
Resource DatetimeIndex
Returns
-------
time_index : pandas.DatetimeIndex
"""
return self.h5.time_index
@property
def lat_lon(self):
"""
Extract (latitude, longitude) pairs
Returns
-------
lat_lon : ndarray
"""
return self.h5.h5.lat_lon
@property
def coordinates(self):
"""
Coordinates: (lat, lon) pairs
Returns
-------
lat_lon : ndarray
"""
return self.lat_lon
@property
def global_attrs(self):
"""
Global (file) attributes
Returns
-------
global_attrs : dict
"""
return self.get_attrs()
@property
def attrs(self):
"""
Dictionary of all dataset attributes
Returns
-------
attrs : dict
"""
return self.h5.h5.attrs
@property
def shapes(self):
"""
Dictionary of all dataset shapes
Returns
-------
shapes : dict
"""
return self.h5.h5.shapes
@property
def dtypes(self):
"""
Dictionary of all dataset dtypes
Returns
-------
dtypes : dict
"""
return self.h5.h5.dtypes
@property
def chunks(self):
"""
Dictionary of all dataset chunk sizes
Returns
-------
chunks : dict
"""
return self.h5.h5.chunks
@property
def scale_factors(self):
"""
Dictionary of all dataset scale factors
Returns
-------
scale_factors : dict
"""
return self.h5.h5.scale_factors
@property
def units(self):
"""
Dictionary of all dataset units
Returns
-------
units : dict
"""
return self.h5.h5.units
[docs]
def get_attrs(self, dset=None):
"""
Get h5 attributes either from file or dataset
Parameters
----------
dset : str
Dataset to get attributes for, if None get file (global) attributes
Returns
-------
attrs : dict
Dataset or file attributes
"""
return self.h5.h5.get_attrs(dset=dset)
[docs]
def get_dset_properties(self, dset):
"""
Get dataset properties (shape, dtype, chunks)
Parameters
----------
dset : str
Dataset to get scale factor for
Returns
-------
shape : tuple
Dataset array shape
dtype : str
Dataset array dtype
chunks : tuple
Dataset chunk size
"""
return self.h5.h5.get_dset_properties(dset)
[docs]
def get_scale_factor(self, dset):
"""
Get dataset scale factor
Parameters
----------
dset : str
Dataset to get scale factor for
Returns
-------
float
Dataset scale factor, used to unscale int values to floats
"""
return self.h5.h5.get_scale_factor(dset)
[docs]
def get_units(self, dset):
"""
Get dataset units
Parameters
----------
dset : str
Dataset to get units for
Returns
-------
str
Dataset units, None if not defined
"""
return self.h5.h5.get_units(dset)
[docs]
def close(self):
"""
Close h5 instance
"""
self._h5.close()
[docs]
class MultiTimeSolarResource:
"""
Class to handle solar resource data stored temporaly accross multiple .h5
files
"""
def __init__(self, h5_path, unscale=True, str_decode=True, hsds=False,
hsds_kwargs=None):
"""
Parameters
----------
h5_path : str | list
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes. Can also be
an explicit list of multi time files, which themselves can
contain * wildcards.
unscale : bool
Boolean flag to automatically unscale variables on extraction
str_decode : bool
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read.
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
"""
super().__init__(h5_path, unscale=unscale, hsds=hsds,
hsds_kwargs=hsds_kwargs, str_decode=str_decode,
res_cls=SolarResource)
[docs]
class MultiTimeNSRDB(MultiTimeResource):
"""
Class to handle NSRDB data stored temporaly accross multiple .h5
files
"""
PREFIX = 'nsrdb'
def __init__(self, h5_path, unscale=True, str_decode=True, hsds=False,
hsds_kwargs=None):
"""
Parameters
----------
h5_path : str | list
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes. Can also be
an explicit list of multi time files, which themselves can
contain * wildcards.
unscale : bool
Boolean flag to automatically unscale variables on extraction
str_decode : bool
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read.
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
"""
super().__init__(h5_path, unscale=unscale, hsds=hsds,
hsds_kwargs=hsds_kwargs, str_decode=str_decode,
res_cls=NSRDB)
[docs]
class MultiTimeWindResource(MultiTimeResource):
"""
Class to handle wind resource data stored temporaly accross multiple .h5
files
"""
PREFIX = 'wtk'
def __init__(self, h5_path, unscale=True, str_decode=True, hsds=False,
hsds_kwargs=None):
"""
Parameters
----------
h5_path : str | list
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes. Can also be
an explicit list of multi time files, which themselves can
contain * wildcards.
unscale : bool
Boolean flag to automatically unscale variables on extraction
str_decode : bool
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read.
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
"""
super().__init__(h5_path, unscale=unscale, hsds=hsds,
hsds_kwargs=hsds_kwargs, str_decode=str_decode,
res_cls=WindResource)
[docs]
class MultiTimeWaveResource(MultiTimeResource):
"""
Class to handle wave resource data stored temporaly accross multiple .h5
files
"""
def __init__(self, h5_path, unscale=True, str_decode=True, hsds=False,
hsds_kwargs=None):
"""
Parameters
----------
h5_path : str | list
Unix shell style pattern path with * wildcards to multi-file
resource file sets. Files must have the same coordinates
but can have different datasets or time indexes. Can also be
an explicit list of multi time files, which themselves can
contain * wildcards.
unscale : bool
Boolean flag to automatically unscale variables on extraction
str_decode : bool
Boolean flag to decode the bytestring meta data into normal
strings. Setting this to False will speed up the meta data read.
hsds : bool, optional
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS, by default False
hsds_kwargs : dict, optional
Dictionary of optional kwargs for h5pyd, e.g., bucket, username,
password, by default None
"""
super().__init__(h5_path, unscale=unscale, hsds=hsds,
hsds_kwargs=hsds_kwargs, str_decode=str_decode,
res_cls=WaveResource)