# -*- coding: utf-8 -*-
"""
Exclusion layers handler
"""
import json
import logging
import numpy as np
from rex.multi_file_resource import MultiFileResource
from rex.resource import Resource
from rex.utilities.parse_keys import parse_keys
from reV.utilities.exceptions import HandlerKeyError, MultiFileExclusionError
logger = logging.getLogger(__name__)
LATITUDE, LONGITUDE = "latitude", "longitude"
[docs]class ExclusionLayers:
"""
Handler of .h5 file and techmap for Exclusion Layers
"""
def __init__(self, h5_file, hsds=False):
"""
Parameters
----------
h5_file : str | list | tuple
.h5 file containing exclusion layers and techmap,
or a list of h5 files
hsds : bool
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS
"""
self.h5_file = h5_file
if isinstance(h5_file, str):
self._h5 = Resource(h5_file, hsds=hsds)
elif isinstance(h5_file, (list, tuple)):
self._h5 = MultiFileResource(h5_file, check_files=False)
self._preflight_multi_file()
else:
msg = ('Expected str, list, or tuple for h5_file input but '
'received {}'.format(type(h5_file)))
logger.error(msg)
raise TypeError(msg)
self._iarr = None
def __repr__(self):
msg = "{} for {}".format(self.__class__.__name__, self.h5_file)
return msg
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
if type is not None:
raise
def __len__(self):
return len(self.layers)
def __getitem__(self, keys):
ds, ds_slice = parse_keys(keys)
if ds.lower().startswith('lat'):
out = self._get_latitude(*ds_slice)
elif ds.lower().startswith('lon'):
out = self._get_longitude(*ds_slice)
else:
out = self._get_layer(ds, *ds_slice)
return out
def __contains__(self, layer):
return layer in self.layers
def _preflight_multi_file(self):
"""Run simple multi-file exclusion checks."""
lat_shape = self.h5.shapes[LATITUDE]
lon_shape = self.h5.shapes[LONGITUDE]
for layer in self.layers:
lshape = self.h5.shapes[layer]
lshape = lshape[1:] if len(lshape) > 2 else lshape
if lshape != lon_shape or lshape != lat_shape:
msg = ('Shape of layer "{}" is {} which does not match '
'latitude and longitude shapes of {} and {}. '
'Check your exclusion file inputs: {}'
.format(layer, self.h5.shapes[layer],
lat_shape, lon_shape, self.h5._h5_files))
logger.error(msg)
raise MultiFileExclusionError(msg)
check_attrs = ('height', 'width', 'crs', 'transform')
base_profile = {}
for fp in self.h5_file:
with ExclusionLayers(fp) as f:
if not base_profile:
base_profile = f.profile
else:
for attr in check_attrs:
if attr not in base_profile or attr not in f.profile:
msg = ('Multi-file exclusion inputs from {} '
'dont have profiles with height, width, '
'crs, and transform: {} and {}'
.format(self.h5_file, base_profile,
f.profile))
logger.error(msg)
raise MultiFileExclusionError(msg)
base_attr = base_profile[attr]
file_attr = f.profile[attr]
attrs_are_str = (isinstance(base_attr, str)
and isinstance(file_attr, str))
if attr == 'crs' and attrs_are_str:
attrs_match = (set(base_attr.split(' '))
== set(file_attr.split(' ')))
else:
attrs_match = base_profile[attr] == f.profile[attr]
if not attrs_match:
msg = ('Multi-file exclusion inputs from {} '
'dont have matching "{}": {} and {}'
.format(self.h5_file, attr,
base_profile[attr],
f.profile[attr]))
logger.error(msg)
raise MultiFileExclusionError(msg)
[docs] def close(self):
"""
Close h5 instance
"""
self._h5.close()
@property
def h5(self):
"""
Open h5py File instance.
Returns
-------
h5 : rex.MultiFileResource | rex.Resource
"""
return self._h5
@property
def iarr(self):
"""Get an array of 1D index values for the flattened h5 excl extent.
Returns
-------
iarr : np.ndarray
Uint array with same shape as exclusion extent, representing the 1D
index values if the geotiff extent was flattened
(with default flatten order 'C')
"""
if self._iarr is None:
N = self.shape[0] * self.shape[1]
self._iarr = np.arange(N, dtype=np.uint32)
self._iarr = self._iarr.reshape(self.shape)
return self._iarr
@property
def profile(self):
"""
GeoTiff profile for exclusions
Returns
-------
profile : dict
"""
return json.loads(self.h5.global_attrs['profile'])
@property
def crs(self):
"""
GeoTiff projection crs
Returns
-------
str
"""
return self.profile['crs']
@property
def pixel_area(self):
"""Get pixel area in km2 from the transform profile of the excl file.
Returns
-------
area : float
Exclusion pixel area in km2. Will return None if the
appropriate transform attribute is not found.
"""
area = None
if 'transform' in self.profile:
transform = self.profile['transform']
area = np.abs(transform[0] * transform[4])
area /= 1000 ** 2
return area
@property
def layers(self):
"""
Available exclusions layers
Returns
-------
layers : list
"""
layers = self.h5.datasets
return layers
@property
def shape(self):
"""
Exclusion shape (latitude, longitude)
Returns
-------
shape : tuple
"""
shape = self.h5.attrs.get('shape', None)
if shape is None:
shape = self.h5.shapes[LATITUDE]
return tuple(shape)
@property
def chunks(self):
"""
Exclusion layers chunks default chunk size
Returns
-------
chunks : tuple | None
Chunk size of exclusion layers
"""
chunks = self.h5.attrs.get('chunks', None)
if chunks is None:
chunks = self.h5.chunks[LATITUDE]
return chunks
@property
def latitude(self):
"""
Latitude coordinates array
Returns
-------
ndarray
"""
return self[LATITUDE]
@property
def longitude(self):
"""
Longitude coordinates array
Returns
-------
ndarray
"""
return self[LONGITUDE]
[docs] def get_layer_profile(self, layer):
"""
Get profile for a specific exclusion layer
Parameters
----------
layer : str
Layer to get profile for
Returns
-------
profile : dict | None
GeoTiff profile for single exclusion layer
"""
profile = self.h5.get_attrs(dset=layer).get('profile', None)
if profile is not None:
profile = json.loads(profile)
return profile
[docs] def get_layer_crs(self, layer):
"""
Get crs for a specific exclusion layer
Parameters
----------
layer : str
Layer to get profile for
Returns
-------
crs : str | None
GeoTiff projection crs
"""
profile = self.get_layer_profile(layer)
if profile is not None:
crs = profile['crs']
else:
crs = None
return crs
[docs] def get_layer_values(self, layer):
"""
Get values for given layer in Geotiff format (bands, y, x)
Parameters
----------
layer : str
Layer to get values for
Returns
-------
values : ndarray
GeoTiff values for single exclusion layer
"""
values = self.h5[layer]
return values
[docs] def get_layer_description(self, layer):
"""
Get description for given layer
Parameters
----------
layer : str
Layer to get description for
Returns
-------
description : str
Description of layer
"""
description = self.h5.get_attrs(dset=layer).get('description', None)
return description
[docs] def get_nodata_value(self, layer):
"""
Get the nodata value for a given layer
Parameters
----------
layer : str
Layer to get nodata value for
Returns
-------
nodata : int | float | None
nodata value for layer or None if not found
"""
profile = self.get_layer_profile(layer)
nodata = profile.get('nodata', None)
return nodata
def _get_latitude(self, *ds_slice):
"""
Extract latitude coordinates
Parameters
----------
ds_slice : tuple of int | list | slice
Pandas slicing describing which sites and columns to extract
Returns
-------
lat : ndarray
Latitude coordinates
"""
if LATITUDE not in self.h5:
msg = ('"latitude" is missing from {}'
.format(self.h5_file))
logger.error(msg)
raise HandlerKeyError(msg)
ds_slice = (LATITUDE, ) + ds_slice
lat = self.h5[ds_slice]
return lat
def _get_longitude(self, *ds_slice):
"""
Extract longitude coordinates
Parameters
----------
ds_slice : tuple of int | list | slice
Pandas slicing describing which sites and columns to extract
Returns
-------
lon : ndarray
Longitude coordinates
"""
if LONGITUDE not in self.h5:
msg = ('"longitude" is missing from {}'
.format(self.h5_file))
logger.error(msg)
raise HandlerKeyError(msg)
ds_slice = (LONGITUDE, ) + ds_slice
lon = self.h5[ds_slice]
return lon
def _get_layer(self, layer_name, *ds_slice):
"""
Extract data from given dataset
Parameters
----------
layer_name : str
Exclusion layer to extract
ds_slice : tuple of int | list | slice
tuple describing slice of layer array to extract
Returns
-------
layer_data : ndarray
Array of exclusion data
"""
if layer_name not in self.layers:
msg = ('{} not in available layers: {}'
.format(layer_name, self.layers))
logger.error(msg)
raise HandlerKeyError(msg)
shape = self.h5.get_dset_properties(layer_name)[0]
if len(shape) == 3:
ds_slice = (layer_name, 0) + ds_slice
else:
ds_slice = (layer_name, ) + ds_slice
layer_data = self.h5[ds_slice]
return layer_data