"""Custom sup3r solar module. This primarily converts GAN output clearsky ratio
to GHI, DNI, and DHI using NSRDB data and utility modules like DISC
Note that clearsky_ratio is assumed to be clearsky ghi ratio and is calculated
as daily average GHI / daily average clearsky GHI.
"""
import logging
import os
import numpy as np
import pandas as pd
from farms.disc import disc
from farms.utilities import calc_dhi, dark_night
from rex import MultiTimeResource, Resource
from rex.utilities.fun_utils import get_fun_call_str
from scipy.spatial import KDTree
from sup3r.postprocessing import OUTPUT_ATTRS, RexOutputs
from sup3r.preprocessing.utilities import expand_paths
from sup3r.utilities import ModuleName
from sup3r.utilities.cli import BaseCLI
logger = logging.getLogger(__name__)
[docs]
class Solar:
"""Custom sup3r solar module. This primarily converts GAN output clearsky
ratio to GHI, DNI, and DHI using NSRDB data and utility modules like
DISC"""
def __init__(
self,
sup3r_fps,
nsrdb_fp,
t_slice=slice(None),
tz=-7,
agg_factor=1,
nn_threshold=0.5,
cloud_threshold=0.99,
):
"""
Parameters
----------
sup3r_fps : str | list
Full .h5 filepath(s) to one or more sup3r GAN output .h5 chunk
files containing clearsky_ratio, time_index, and meta. These files
must have the same meta data but can be sequential and ordered
temporal chunks. The data in this file has been rolled by tz
to a local time (assumed that the GAN was trained on local time
solar data) and will be converted back into UTC, so it's wise to
include some padding in the sup3r_fps file list.
nsrdb_fp : str
Filepath to NSRDB .h5 file containing clearsky_ghi, clearsky_dni,
clearsky_dhi data.
t_slice : slice
Slicing argument to slice the temporal axis of the sup3r_fps source
data after doing the tz roll to UTC but before returning the
irradiance variables. This can be used to effectively pad the solar
irradiance calculation in UTC time. For example, if sup3r_fps is 3
files each with 24 hours of data, t_slice can be slice(24, 48) to
only output the middle day of irradiance data, but padded by the
other two days for the UTC output.
tz : int
The timezone offset for the data in sup3r_fps. It is assumed that
the GAN is trained on data in local time and therefore the output
in sup3r_fps should be treated as local time. For example, -7 is
MST which is default for CONUS training data.
agg_factor : int
Spatial aggregation factor for nsrdb-to-GAN-meta e.g. the number of
NSRDB spatial pixels to average for a single sup3r GAN output site.
nn_threshold : float
The KDTree nearest neighbor threshold that determines how far the
sup3r GAN output data has to be from the NSRDB source data to get
irradiance=0. Note that is value is in decimal degrees which is a
very approximate way to determine real distance.
cloud_threshold : float
Clearsky ratio threshold below which the data is considered cloudy
and DNI is calculated using DISC.
"""
self.t_slice = t_slice
self.agg_factor = agg_factor
self.nn_threshold = nn_threshold
self.cloud_threshold = cloud_threshold
self.tz = tz
self._nsrdb_fp = nsrdb_fp
self._sup3r_fps = sup3r_fps
if isinstance(self._sup3r_fps, str):
self._sup3r_fps = [self._sup3r_fps]
logger.debug(
'Initializing solar module with sup3r files: {}'.format(
[os.path.basename(fp) for fp in self._sup3r_fps]
)
)
logger.debug(
'Initializing solar module with temporal slice: {}'.format(
self.t_slice
)
)
logger.debug(
'Initializing solar module with NSRDB source fp: {}'.format(
self._nsrdb_fp
)
)
self.gan_data = MultiTimeResource(self._sup3r_fps)
self.nsrdb = Resource(self._nsrdb_fp)
# cached variables
self._nsrdb_tslice = None
self._idnn = None
self._dist = None
self._sza = None
self._cs_ratio = None
self._ghi = None
self._dni = None
self._dhi = None
self.preflight()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
if exc_type is not None:
raise
[docs]
def preflight(self):
"""Run preflight checks on source data to make sure everything will
work together."""
assert 'clearsky_ratio' in self.gan_data.dsets
assert 'clearsky_ghi' in self.nsrdb.dsets
assert 'clearsky_dni' in self.nsrdb.dsets
assert 'solar_zenith_angle' in self.nsrdb.dsets
assert 'surface_pressure' in self.nsrdb.dsets
assert isinstance(self.nsrdb_tslice, slice)
delta = pd.Series(self.time_index[1:] - self.time_index[:-1])
delta = delta.mean().total_seconds()
msg = (
'Its assumed that the sup3r GAN output solar data will be '
'hourly but received time index: {}'.format(self.time_index)
)
assert delta == 3600, msg
[docs]
def close(self):
"""Close all internal file handlers"""
self.gan_data.close()
self.nsrdb.close()
@property
def idnn(self):
"""Get the nearest neighbor meta data indices from the NSRDB data that
correspond to the sup3r GAN data
Returns
-------
idnn : Union[np.ndarray, da.core.Array]
2D array of length (n_sup3r_sites, agg_factor) where the values are
meta data indices from the NSRDB.
"""
if self._idnn is None:
tree = KDTree(self.nsrdb.meta[['latitude', 'longitude']])
out = tree.query(self.gan_data.meta[['latitude', 'longitude']])
self._dist, self._idnn = out
if len(self._idnn.shape) == 1:
self._dist = np.expand_dims(self._dist, axis=1)
self._idnn = np.expand_dims(self._idnn, axis=1)
return self._idnn
@property
def dist(self):
"""Get the nearest neighbor distances from the sup3r GAN data sites to
the NSRDB nearest neighbors.
Returns
-------
dist : Union[np.ndarray, da.core.Array]
2D array of length (n_sup3r_sites, agg_factor) where the values are
decimal degree distances from the sup3r sites to the nsrdb nearest
neighbors.
"""
if self._dist is None:
_ = self.idnn
return self._dist
@property
def time_index(self):
"""Time index for the sup3r GAN output data but sliced by t_slice
Returns
-------
pd.DatetimeIndex
"""
return self.gan_data.time_index[self.t_slice]
@property
def out_of_bounds(self):
"""Get a boolean mask for the sup3r data that is out of bounds (too far
from the NSRDB data).
Returns
-------
out_of_bounds : Union[np.ndarray, da.core.Array]
1D boolean array with length == number of sup3r GAN sites. True if
the site is too far from the NSRDB.
"""
return (self.dist > self.nn_threshold).any(axis=1)
@property
def nsrdb_tslice(self):
"""Get the time slice of the NSRDB data corresponding to the sup3r GAN
output."""
if self._nsrdb_tslice is None:
doy_nsrdb = self.nsrdb.time_index.day_of_year
doy_gan = self.time_index.day_of_year
mask = doy_nsrdb.isin(doy_gan)
if mask.sum() == 0:
msg = (
'Time index intersection of the NSRDB time index and '
'sup3r GAN output has only {} common timesteps! '
'Something went wrong.\nNSRDB time index: \n{}\nSup3r '
'GAN output time index:\n{}'.format(
mask.sum(), self.nsrdb.time_index, self.time_index
)
)
logger.error(msg)
raise RuntimeError(msg)
ilocs = np.where(mask)[0]
t0, t1 = ilocs[0], ilocs[-1] + 1
ti_nsrdb = self.nsrdb.time_index
delta = (
pd.Series(ti_nsrdb[1:] - ti_nsrdb[:-1])[1:]
.mean()
.total_seconds()
)
step = int(3600 / delta)
self._nsrdb_tslice = slice(t0, t1, step)
logger.debug(
'Found nsrdb_tslice {} with corresponding '
'time index:\n\t{}'.format(
self._nsrdb_tslice,
self.nsrdb.time_index[self._nsrdb_tslice],
)
)
return self._nsrdb_tslice
@property
def clearsky_ratio(self):
"""Get the clearsky ghi ratio data from the GAN output, rolled from tz
to UTC.
Returns
-------
clearsky_ratio : Union[np.ndarray, da.core.Array]
2D array with shape (time, sites) in UTC.
"""
if self._cs_ratio is None:
self._cs_ratio = self.gan_data['clearsky_ratio']
self._cs_ratio = np.roll(self._cs_ratio, -self.tz, axis=0)
# if tz is negative, roll to utc is positive, and the beginning of
# the dataset is rolled over from the end and must be backfilled,
# otherwise you can get seams
self._cs_ratio[: -self.tz, :] = self._cs_ratio[-self.tz, :]
# apply temporal slicing of source data, see docstring on t_slice
# for more info
self._cs_ratio = self._cs_ratio[self.t_slice, :]
return self._cs_ratio
@property
def solar_zenith_angle(self):
"""Get the solar zenith angle (degrees)
Returns
-------
solar_zenith_angle : Union[np.ndarray, da.core.Array]
2D array with shape (time, sites) in UTC.
"""
if self._sza is None:
self._sza = self.get_nsrdb_data('solar_zenith_angle')
return self._sza
@property
def ghi(self):
"""Get the ghi (W/m2) based on the GAN output clearsky ratio +
clearsky_ghi in UTC.
Returns
-------
ghi : Union[np.ndarray, da.core.Array]
2D array with shape (time, sites) in UTC.
"""
if self._ghi is None:
logger.debug('Calculating GHI.')
nsrdb_cs_ghi = self.get_nsrdb_data('clearsky_ghi')
self._ghi = nsrdb_cs_ghi * self.clearsky_ratio
self._ghi[:, self.out_of_bounds] = 0
return self._ghi
@property
def dni(self):
"""Get the dni (W/m2) which is clearsky dni (from NSRDB) when the GAN
output is clear (clearsky_ratio is > cloud_threshold), and calculated
from the DISC model when cloudy
Returns
-------
dni : Union[np.ndarray, da.core.Array]
2D array with shape (time, sites) in UTC.
"""
if self._dni is None:
logger.debug('Calculating DNI.')
self._dni = self.get_nsrdb_data('clearsky_dni')
pressure = self.get_nsrdb_data('surface_pressure')
doy = self.time_index.day_of_year.values
cloudy_dni = disc(
self.ghi, self.solar_zenith_angle, doy, pressure=pressure
)
cloudy_dni = np.minimum(self._dni, cloudy_dni)
self._dni[self.cloud_mask] = cloudy_dni[self.cloud_mask]
self._dni = dark_night(self._dni, self.solar_zenith_angle)
self._dni[:, self.out_of_bounds] = 0
return self._dni
@property
def dhi(self):
"""Get the dhi (W/m2) which is calculated based on the simple
relationship between GHI, DNI and solar zenith angle.
Returns
-------
dhi : Union[np.ndarray, da.core.Array]
2D array with shape (time, sites) in UTC.
"""
if self._dhi is None:
logger.debug('Calculating DHI.')
self._dhi, self._dni = calc_dhi(
self.dni, self.ghi, self.solar_zenith_angle
)
self._dhi = dark_night(self._dhi, self.solar_zenith_angle)
self._dhi[:, self.out_of_bounds] = 0
return self._dhi
@property
def cloud_mask(self):
"""Get the cloud mask (True if cloudy) based on the GAN output clearsky
ratio in UTC.
Returns
-------
cloud_mask : Union[np.ndarray, da.core.Array]
2D array with shape (time, sites) in UTC.
"""
return self.clearsky_ratio < self.cloud_threshold
[docs]
def get_nsrdb_data(self, dset):
"""Get an NSRDB dataset with spatial index corresponding to the sup3r
GAN output data, averaged across the agg_factor.
Parameters
----------
dset : str
Name of dataset to retrieve from NSRDB source file
Returns
-------
out : Union[np.ndarray, da.core.Array]
Dataset of shape (time, sites) where time and sites correspond to
the same shape as the sup3r GAN output data and if agg_factor > 1
the sites is an average across multiple NSRDB sites.
"""
logger.debug('Retrieving "{}" from NSRDB source data.'.format(dset))
out = None
for idx in range(self.idnn.shape[1]):
temp = self.nsrdb[dset, self.nsrdb_tslice, self.idnn[:, idx]]
temp = temp.astype(np.float32)
if out is None:
out = temp
else:
out += temp
out /= self.idnn.shape[1]
return out
[docs]
@staticmethod
def get_sup3r_fps(fp_pattern, ignore=None):
"""Get a list of file chunks to run in parallel based on a file pattern
Note
----
It's assumed that all source files have the pattern
`sup3r_file_TTTTTT_SSSSSS.h5` where TTTTTT is the zero-padded temporal
chunk index and SSSSSS is the zero-padded spatial chunk index.
Parameters
----------
fp_pattern : str | list
Unix-style file*pattern that matches a set of spatiotemporally
chunked sup3r forward pass output files.
ignore : str | None
Ignore all files that have this string in their filenames.
Returns
-------
fp_sets : list
List of file sets where each file set is 3 temporally sequential
files over the same spatial chunk. Each file set overlaps with its
neighbor such that fp_sets[0][-1] == fp_sets[1][0] (this is so
Solar can do temporal padding when rolling from GAN local time
output to UTC).
t_slices : list
List of t_slice arguments corresponding to fp_sets to pass to
Solar class initialization that will slice and reduce the
overlapping time axis when Solar outputs irradiance data.
temporal_ids : list
List of temporal id strings TTTTTT corresponding to the fp_sets
spatial_ids : list
List of spatial id strings SSSSSS corresponding to the fp_sets
target_fps : list
List of actual target files corresponding to fp_sets, so for
example the file set fp_sets[10] sliced by t_slices[10] is designed
to process target_fps[10]
"""
all_fps = [fp for fp in expand_paths(fp_pattern) if fp.endswith('.h5')]
if ignore is not None:
all_fps = [
fp for fp in all_fps if ignore not in os.path.basename(fp)
]
all_fps = sorted(all_fps)
source_dir = os.path.dirname(all_fps[0])
source_fn_base = os.path.basename(all_fps[0]).replace('.h5', '')
source_fn_base = '_'.join(source_fn_base.split('_')[:-2])
all_id_spatial = [
fp.replace('.h5', '').split('_')[-1] for fp in all_fps
]
all_id_temporal = [
fp.replace('.h5', '').split('_')[-2] for fp in all_fps
]
all_id_spatial = sorted(set(all_id_spatial))
all_id_temporal = sorted(set(all_id_temporal))
fp_sets = []
t_slices = []
temporal_ids = []
spatial_ids = []
target_fps = []
for idt, id_temporal in enumerate(all_id_temporal):
start = 0
single_chunk_id_temps = [id_temporal]
if idt > 0:
start = 24
single_chunk_id_temps.insert(0, all_id_temporal[idt - 1])
if idt < (len(all_id_temporal) - 1):
single_chunk_id_temps.append(all_id_temporal[idt + 1])
for id_spatial in all_id_spatial:
single_fp_set = []
for t_str in single_chunk_id_temps:
fp = os.path.join(source_dir, source_fn_base)
fp += f'_{t_str}_{id_spatial}.h5'
single_fp_set.append(fp)
fp_target = os.path.join(source_dir, source_fn_base)
fp_target += f'_{id_temporal}_{id_spatial}.h5'
fp_sets.append(single_fp_set)
t_slices.append(slice(start, start + 24))
temporal_ids.append(id_temporal)
spatial_ids.append(id_spatial)
target_fps.append(fp_target)
return fp_sets, t_slices, temporal_ids, spatial_ids, target_fps
[docs]
@classmethod
def get_node_cmd(cls, config):
"""Get a CLI call to run Solar.run_temporal_chunk() on a single
node based on an input config.
Parameters
----------
config : dict
sup3r solar config with all necessary args and kwargs to
run Solar.run_temporal_chunk() on a single node.
"""
import_str = 'import time;\n'
import_str += 'from gaps import Status;\n'
import_str += 'from rex import init_logger;\n'
import_str += f'from sup3r.solar import {cls.__name__}'
fun_str = get_fun_call_str(cls.run_temporal_chunks, config)
log_file = config.get('log_file', None)
log_level = config.get('log_level', 'INFO')
log_arg_str = f'"sup3r", log_level="{log_level}"'
if log_file is not None:
log_arg_str += f', log_file="{log_file}"'
cmd = (
f"python -c '{import_str};\n"
't0 = time.time();\n'
f'logger = init_logger({log_arg_str});\n'
f'{fun_str};\n'
't_elap = time.time() - t0;\n'
)
pipeline_step = config.get('pipeline_step') or ModuleName.SOLAR
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";'\n"
return cmd.replace('\\', '/')
[docs]
def write(self, fp_out, features=('ghi', 'dni', 'dhi')):
"""Write irradiance datasets (ghi, dni, dhi) to output h5 file.
Parameters
----------
fp_out : str
Filepath to an output h5 file to write irradiance variables to.
Parent directory will be created if it does not exist.
features : list | tuple
List of features to write to disk. These have to be attributes of
the Solar class (ghi, dni, dhi).
"""
if not os.path.exists(os.path.dirname(fp_out)):
os.makedirs(os.path.dirname(fp_out), exist_ok=True)
with RexOutputs(fp_out, 'w') as fh:
fh.meta = self.gan_data.meta
fh.time_index = self.time_index
for feature in features:
attrs = OUTPUT_ATTRS[feature]
arr = getattr(self, feature, None)
if arr is None:
msg = (
'Feature "{}" was not available from Solar '
'module class.'.format(feature)
)
logger.error(msg)
raise AttributeError(msg)
fh.add_dataset(
fp_out,
feature,
arr,
dtype=attrs['dtype'],
attrs=attrs,
chunks=attrs['chunks'],
)
logger.info(f'Added "{feature}" to output file.')
run_attrs = self.gan_data.h5[self._sup3r_fps[0]].global_attrs
run_attrs['nsrdb_source'] = self._nsrdb_fp
fh.run_attrs = run_attrs
logger.info(f'Finished writing file: {fp_out}')
[docs]
@classmethod
def run_temporal_chunks(
cls,
fp_pattern,
nsrdb_fp,
fp_out_suffix='irradiance',
tz=-7,
agg_factor=1,
nn_threshold=0.5,
cloud_threshold=0.99,
features=('ghi', 'dni', 'dhi'),
temporal_ids=None,
):
"""Run the solar module on all spatial chunks for each temporal
chunk corresponding to the fp_pattern and the given list of
temporal_ids. This typically gets run from the CLI.
Parameters
----------
fp_pattern : str
Unix-style file*pattern that matches a set of spatiotemporally
chunked sup3r forward pass output files.
nsrdb_fp : str
Filepath to NSRDB .h5 file containing clearsky_ghi, clearsky_dni,
clearsky_dhi data.
fp_out_suffix : str
Suffix to add to the input sup3r source files when writing the
processed solar irradiance data to new data files.
tz : int
The timezone offset for the data in sup3r_fps. It is assumed that
the GAN is trained on data in local time and therefore the output
in sup3r_fps should be treated as local time. For example, -7 is
MST which is default for CONUS training data.
agg_factor : int
Spatial aggregation factor for nsrdb-to-GAN-meta e.g. the number of
NSRDB spatial pixels to average for a single sup3r GAN output site.
nn_threshold : float
The KDTree nearest neighbor threshold that determines how far the
sup3r GAN output data has to be from the NSRDB source data to get
irradiance=0. Note that is value is in decimal degrees which is a
very approximate way to determine real distance.
cloud_threshold : float
Clearsky ratio threshold below which the data is considered cloudy
and DNI is calculated using DISC.
features : list | tuple
List of features to write to disk. These have to be attributes of
the Solar class (ghi, dni, dhi).
temporal_ids : list | None
Lise of zero-padded temporal ids from the file chunks that match
fp_pattern. This input typically gets set from the CLI. If None,
this will run all temporal indices.
"""
if temporal_ids is None:
cls._run_temporal_chunk(
fp_pattern=fp_pattern,
nsrdb_fp=nsrdb_fp,
fp_out_suffix=fp_out_suffix,
tz=tz,
agg_factor=agg_factor,
nn_threshold=nn_threshold,
cloud_threshold=cloud_threshold,
features=features,
temporal_id=temporal_ids,
)
else:
for temporal_id in temporal_ids:
cls._run_temporal_chunk(
fp_pattern=fp_pattern,
nsrdb_fp=nsrdb_fp,
fp_out_suffix=fp_out_suffix,
tz=tz,
agg_factor=agg_factor,
nn_threshold=nn_threshold,
cloud_threshold=cloud_threshold,
features=features,
temporal_id=temporal_id,
)
@classmethod
def _run_temporal_chunk(
cls,
fp_pattern,
nsrdb_fp,
fp_out_suffix='irradiance',
tz=-7,
agg_factor=1,
nn_threshold=0.5,
cloud_threshold=0.99,
features=('ghi', 'dni', 'dhi'),
temporal_id=None,
):
"""Run the solar module on all spatial chunks for a single temporal
chunk corresponding to the fp_pattern. This typically gets run from the
CLI.
See Also
--------
:meth:`run_temporal_chunks`
"""
temp = cls.get_sup3r_fps(fp_pattern, ignore=f'_{fp_out_suffix}.h5')
fp_sets, t_slices, temporal_ids, _, target_fps = temp
if temporal_id is not None:
fp_sets = [
fp_set
for i, fp_set in enumerate(fp_sets)
if temporal_ids[i] == temporal_id
]
t_slices = [
t_slice
for i, t_slice in enumerate(t_slices)
if temporal_ids[i] == temporal_id
]
target_fps = [
target_fp
for i, target_fp in enumerate(target_fps)
if temporal_ids[i] == temporal_id
]
zip_iter = zip(fp_sets, t_slices, target_fps)
for i, (fp_set, t_slice, fp_target) in enumerate(zip_iter):
fp_out = fp_target.replace('.h5', f'_{fp_out_suffix}.h5')
if os.path.exists(fp_out):
logger.info('%s already exists. Skipping.', fp_out)
else:
logger.info(
'Running temporal index {} out of {}.'.format(
i + 1, len(fp_sets)
)
)
kwargs = {
't_slice': t_slice,
'tz': tz,
'agg_factor': agg_factor,
'nn_threshold': nn_threshold,
'cloud_threshold': cloud_threshold,
}
tmp_out = fp_out + '.tmp'
with Solar(fp_set, nsrdb_fp, **kwargs) as solar:
solar.write(tmp_out, features=features)
os.replace(tmp_out, fp_out)