"""NSRDB Typical Meteorological Year (TMY) runner code."""
import logging
import os
import shutil
from concurrent.futures import as_completed
from typing import ClassVar
import h5py
import numpy as np
from cloud_fs import FileSystem
from rex import init_logger
from rex.utilities.execution import SpawnProcessPool
from nsrdb.data_model.variable_factory import VarFactory
from nsrdb.file_handlers.outputs import Outputs
from nsrdb.file_handlers.resource import Resource
from .tmy import Tmy
logger = logging.getLogger(__name__)
[docs]
class TmyRunner:
"""Class to handle running TMY, collecting outs, and writing to files."""
WEIGHTS: ClassVar = {
'TMY': {
'max_air_temperature': 0.05,
'min_air_temperature': 0.05,
'mean_air_temperature': 0.1,
'max_dew_point': 0.05,
'min_dew_point': 0.05,
'mean_dew_point': 0.1,
'max_wind_speed': 0.05,
'mean_wind_speed': 0.05,
'sum_dni': 0.25,
'sum_ghi': 0.25,
},
'TDY': {'sum_dni': 1.0},
'TGY': {'sum_ghi': 1.0},
}
def __init__(
self,
nsrdb_base_fp,
years,
weights,
sites_per_worker=100,
n_nodes=1,
node_index=0,
site_slice=None,
out_dir='/tmp/scratch/tmy/',
fn_out='tmy.h5',
supplemental_fp=None,
var_meta=None,
):
"""
Parameters
----------
nsrdb_base_fp : str
Base nsrdb filepath to retrieve annual files from. Must include
a single {} format option for the year. Can include * for an
NSRDB multi file source.
years : iterable
Iterable of years to include in the TMY calculation.
weights : dict
Lookup of {dset: weight} where dset is a variable h5 dset name
and weight is a fractional TMY weighting. All weights must
sum to 1.0
sites_per_worker : int
Number of sites to run at once (sites per core/worker).
n_nodes : int
Number of nodes being run.
node_index : int
Index of this node job.
site_slice : slice
Sites to consider in the GLOBAL TMY run. If multiple jobs are being
run, the site slice should be the same for all jobs, and slices the
full spatial extent meta data.
out_dir : str
Directory to dump temporary output files.
fn_out : str
Final output filename.
supplemental_fp : None | dict
Supplemental data base filepaths including {} for year for
uncommon dataset inputs to the TMY calculation. For example:
{'poa': '/projects/pxs/poa_h5_dir/poa_out_{}.h5'}
var_meta : str
CSV filepath containing meta data for all NSRDB variables.
Defaults to the NSRDB var meta csv in git repo.
"""
logger.info('Initializing TMY runner for years: {}'.format(years))
logger.info('TMY weights: {}'.format(weights))
self._nsrdb_base_fp = nsrdb_base_fp
self._years = years
self._weights = weights
self._sites_per_worker = sites_per_worker
self._n_nodes = n_nodes
self._node_index = node_index
self._site_chunks = None
self._site_chunks_index = None
self._site_slice = (
slice(None)
if site_slice is None
else slice(*site_slice)
if isinstance(site_slice, list)
else site_slice
)
self._meta = None
self._dsets = None
self._out_dir = out_dir
self._fn_out = fn_out
self._final_fpath = os.path.join(self._out_dir, self._fn_out)
self._supplemental_fp = supplemental_fp
self._var_meta = var_meta
if not os.path.exists(self._out_dir):
os.makedirs(self._out_dir)
self._tmy_obj = Tmy(
self._nsrdb_base_fp,
self._years,
self._weights,
site_slice=slice(0, 1),
supplemental_fp=supplemental_fp,
)
out = self._setup_job_chunks(
self.meta,
self._sites_per_worker,
self._n_nodes,
self._node_index,
self._out_dir,
)
self._site_chunks, self._site_chunks_index, self._f_out_chunks = out
logger.info('Node meta data is: \n{}'.format(self.meta))
logger.info(
'Node index {} with n_nodes {} running site chunks: '
'{} ... {}'.format(
node_index,
n_nodes,
str(self._site_chunks)[:100],
str(self._site_chunks)[-100:],
)
)
logger.info(
'Node index {} with n_nodes {} running site chunks '
'indices: {} ... {}'.format(
node_index,
n_nodes,
str(self._site_chunks_index)[:100],
str(self._site_chunks_index)[-100:],
)
)
logger.info(
'Node index {} with n_nodes {} running fout chunks: '
'{} ... {}'.format(
node_index,
n_nodes,
str(self._f_out_chunks)[:100],
str(self._f_out_chunks)[-100:],
)
)
@staticmethod
def _setup_job_chunks(
meta, sites_per_worker, n_nodes, node_index, out_dir
):
"""Setup chunks and file names for a multi-chunk multi-node job.
Parameters
----------
meta : pd.DataFrame
FULL NSRDB meta data.
sites_per_worker : int
Number of sites to run at once (sites per core/worker).
n_nodes : int
Number of nodes being run.
node_index : int
Index of this node job (if a multi node job is being run).
out_dir : str
Directory to dump temporary output files.
Returns
-------
site_chunks : list
List of slices setting the site chunks to be run by this job.
site_chunks_index : list
List of integers setting the site chunk indices to be run by
this job.
f_out_chunks : dict
Dictionary of file output paths keyed by the site chunk indices.
"""
arr = meta.index.values
tmp = np.array_split(arr, np.ceil(len(arr) / sites_per_worker))
site_chunks = [slice(x.min(), x.max() + 1) for x in tmp]
site_chunks_index = list(range(len(site_chunks)))
site_chunks = np.array_split(np.array(site_chunks), n_nodes)[
node_index
].tolist()
site_chunks_index = np.array_split(
np.array(site_chunks_index), n_nodes
)[node_index].tolist()
f_out_chunks = {}
chunk_dir = os.path.join(out_dir, 'chunks/')
if not os.path.exists(chunk_dir):
os.makedirs(chunk_dir)
for ichunk in site_chunks_index:
f_out = os.path.join(chunk_dir, 'temp_out_{}.h5'.format(ichunk))
f_out_chunks[ichunk] = f_out
return site_chunks, site_chunks_index, f_out_chunks
@property
def meta(self):
"""Get the full NSRDB meta data."""
if self._meta is None:
fpath, Handler = self._tmy_obj._get_fpath('ghi', self._years[0])
with FileSystem(fpath) as f, Handler(f) as res:
self._meta = res.meta.iloc[self._site_slice, :]
return self._meta
@property
def dsets(self):
"""Get the NSRDB datasets excluding meta and time index."""
if self._dsets is None:
fpath, Handler = self._tmy_obj._get_fpath('ghi', self._years[0])
with FileSystem(fpath) as f, Handler(f) as res:
self._dsets = []
for d in res.dsets:
if res.shapes[d] == res.shape:
self._dsets.append(d)
if self._supplemental_fp is not None:
self._dsets += list(self._supplemental_fp.keys())
self._dsets.append('tmy_year')
self._dsets.append('tmy_year_short')
self._dsets = list(set(self._dsets))
return self._dsets
@property
def site_chunks(self):
"""Get a list of site chunk slices to parallelize across"""
return self._site_chunks
[docs]
@staticmethod
def get_dset_attrs(dsets, var_meta=None):
"""Get output file dataset attributes for a set of datasets.
Parameters
----------
dsets : list
List of dataset / variable names.
var_meta : str | pd.DataFrame | None
CSV file or dataframe containing meta data for all NSRDB variables.
Defaults to the NSRDB var meta csv in git repo.
Returns
-------
attrs : dict
Dictionary of dataset attributes keyed by dset name.
chunks : dict
Dictionary of chunk tuples keyed by dset name.
dtypes : dict
dictionary of numpy datatypes keyed by dset name.
"""
attrs = {}
chunks = {}
dtypes = {}
for dset in dsets:
var_obj = VarFactory.get_base_handler(dset, var_meta=var_meta)
attrs[dset] = var_obj.attrs
chunks[dset] = var_obj.chunks
dtypes[dset] = var_obj.final_dtype
if 'units' in attrs[dset]:
attrs[dset]['psm_units'] = attrs[dset]['units']
if 'scale_factor' in attrs[dset]:
attrs[dset]['psm_scale_factor'] = attrs[dset]['scale_factor']
return attrs, chunks, dtypes
def _collect_chunk(self, f_out_chunk, site_slice, out):
"""Add single chunk to final output."""
f_chunk_basename = os.path.basename(f_out_chunk)
with Resource(f_out_chunk, unscale=True) as chunk:
for dset in self.dsets:
try:
data = chunk[dset]
except Exception as e:
m = (
f'Could not read file dataset "{dset}" from file '
f'"{f_chunk_basename}". Received the following '
f'exception: \n{e}'
)
logger.exception(m)
raise e
else:
out[dset, :, site_slice] = data
return out
def _collect(self, purge_chunks=False):
"""Collect all chunked files into the final fout."""
status_file = os.path.join(self._out_dir, 'collect_status.txt')
status = self._pre_collect(status_file)
self._init_final_fout()
with Outputs(self._final_fpath, mode='a', unscale=True) as out:
for i, f_out_chunk in self._f_out_chunks.items():
site_slice = self.site_chunks[i]
f_chunk_basename = os.path.basename(f_out_chunk)
msg = f'Skipping file, already collected: {f_chunk_basename}'
if f_chunk_basename not in status:
out = self._collect_chunk(
f_out_chunk=f_out_chunk, site_slice=site_slice, out=out
)
with open(status_file, 'a') as f:
f.write('{}\n'.format(os.path.basename(f_out_chunk)))
msg = (
f'Finished collecting #{i + 1} out of '
f'{len(self._f_out_chunks)} for sites {site_slice} '
f'from file {f_chunk_basename}'
)
logger.info(msg)
if purge_chunks:
chunk_dir = os.path.dirname(
next(iter(self._f_out_chunks.values()))
)
logger.info('Purging chunk directory: {}'.format(chunk_dir))
shutil.rmtree(chunk_dir)
def _pre_collect(self, status_file):
"""Check to see if all chunked files exist before running collect
Parameters
----------
status_file : str
Filepath to status file with a line for each file that has been
collected.
Returns
-------
status : list
List of filenames that have already been collected.
"""
missing = [
fp for fp in self._f_out_chunks.values() if not os.path.exists(fp)
]
if any(missing):
emsg = 'Chunked file outputs are missing: {}'.format(missing)
logger.error(emsg)
raise FileNotFoundError(emsg)
msg = 'All chunked files found. Running collection.'
logger.info(msg)
status = []
if os.path.exists(status_file):
with open(status_file) as f:
status = f.readlines()
status = [s.strip('\n') for s in status]
return status
def _init_final_fout(self):
"""Initialize the final output file."""
self._init_file(
self._final_fpath,
self.dsets,
self._tmy_obj.time_index,
self.meta,
var_meta=self._var_meta,
)
@staticmethod
def _init_file(f_out, dsets, time_index, meta, var_meta=None):
"""Initialize an output file.
Parameters
----------
f_out : str
File path to final .h5 file.
dsets : list
List of dataset names to initialize
time_index : pd.datetimeindex
Time index to init to file.
meta : pd.DataFrame
Meta data to init to file.
var_meta : str | pd.DataFrame | None
CSV file or dataframe containing meta data for all NSRDB variables.
Defaults to the NSRDB var meta csv in git repo.
"""
if not os.path.isfile(f_out):
dsets_mod = [d for d in dsets if 'tmy_year' not in d]
attrs, chunks, dtypes = TmyRunner.get_dset_attrs(
dsets_mod, var_meta=var_meta
)
dsets_mod.append('tmy_year')
attrs['tmy_year'] = {
'units': 'selected_year',
'scale_factor': 1,
'psm_units': 'selected_year',
'psm_scale_factor': 1,
}
chunks['tmy_year'] = chunks['dni']
dtypes['tmy_year'] = np.uint16
Outputs.init_h5(
f_out, dsets_mod, attrs, chunks, dtypes, time_index, meta
)
with h5py.File(f_out, mode='a') as f:
d = 'tmy_year_short'
f.create_dataset(d, shape=(12, len(meta)), dtype=np.uint16)
f[d].attrs['units'] = 'selected_year'
f[d].attrs['scale_factor'] = 1
f[d].attrs['psm_units'] = 'selected_year'
f[d].attrs['psm_scale_factor'] = 1
@staticmethod
def _write_output(f_out, data_dict, time_index, meta, var_meta=None):
"""Initialize and write an output file chunk.
Parameters
----------
f_out : str
File path to final .h5 file.
data_dict : dict
{Dset: data_arr} dictionary
time_index : pd.datetimeindex
Time index to init to file.
meta : pd.DataFrame
Meta data to init to file.
var_meta : str | pd.DataFrame | None
CSV file or dataframe containing meta data for all NSRDB variables.
Defaults to the NSRDB var meta csv in git repo.
"""
logger.debug('Saving TMY results to: {}'.format(f_out))
TmyRunner._init_file(
f_out, list(data_dict.keys()), time_index, meta, var_meta=var_meta
)
with Outputs(f_out, mode='a') as f:
for dset, arr in data_dict.items():
f[dset] = arr
@staticmethod
def _run_file(fp):
"""Check whether to run tmy for a given output filepath based on
whether that file already exists and its file size."""
run = True
if os.path.exists(fp):
size = os.path.getsize(fp)
if size > 1e6:
run = False
return run
[docs]
@staticmethod
def run_single(
nsrdb_base_fp,
years,
weights,
site_slice,
dsets,
f_out,
supplemental_fp=None,
var_meta=None,
):
"""Run TMY for a single site chunk (slice) and save to disk.
Parameters
----------
nsrdb_base_fp : str
Base nsrdb filepath to retrieve annual files from. Must include
a single {} format option for the year. Can include * for an
NSRDB multi file source.
years : iterable
Iterable of years to include in the TMY calculation.
weights : dict
Lookup of {dset: weight} where dset is a variable h5 dset name
and weight is a fractional TMY weighting. All weights must
sum to 1.0
site_slice : slice
Sites to consider in this TMY chunk.
dsets : list
List of TMY datasets to make.
f_out : str
Filepath to save file for this chunk.
supplemental_fp : None | dict
Supplemental data base filepaths including {} for year for
uncommon dataset inputs to the TMY calculation. For example:
{'poa': '/projects/pxs/poa_h5_dir/poa_out_{}.h5'}
var_meta : str | pd.DataFrame | None
CSV file or dataframe containing meta data for all NSRDB variables.
Defaults to the NSRDB var meta csv in git repo.
Returns
-------
True
"""
run = TmyRunner._run_file(f_out)
if run:
data_dict = {}
tmy = Tmy(
nsrdb_base_fp=nsrdb_base_fp,
years=years,
weights=weights,
site_slice=site_slice,
supplemental_fp=supplemental_fp,
)
for dset in dsets:
data_dict[dset] = tmy.get_tmy_timeseries(dset)
TmyRunner._write_output(
f_out, data_dict, tmy.time_index, tmy.meta, var_meta=var_meta
)
else:
logger.info(
'Skipping chunk, f_out already exists: {}'.format(f_out)
)
return True
def _run_serial(self):
"""Run serial tmy futures and save temp chunks to disk."""
logger.info(
f'Running in serial for {len(self.site_chunks)} site chunks'
)
for i, site_slice in enumerate(self.site_chunks):
self.run_single(
nsrdb_base_fp=self._nsrdb_base_fp,
years=self._years,
weights=self._weights,
site_slice=site_slice,
dsets=self.dsets,
f_out=self._f_out_chunks[self._site_chunks_index[i]],
supplemental_fp=self._supplemental_fp,
var_meta=self._var_meta,
)
logger.info(
f'{i + 1} out of {len(self.site_chunks)} TMY chunks completed.'
)
def _run_parallel(self):
"""Run parallel tmy futures and save temp chunks to disk."""
futures = {}
loggers = ['nsrdb']
logger.info(
f'Running in parallel for {len(self.site_chunks)} site chunks'
)
with SpawnProcessPool(loggers=loggers) as exe:
logger.info(f'Kicking off {len(self.site_chunks)} futures.')
for i, site_slice in enumerate(self.site_chunks):
future = exe.submit(
self.run_single,
nsrdb_base_fp=self._nsrdb_base_fp,
years=self._years,
weights=self._weights,
site_slice=site_slice,
dsets=self.dsets,
f_out=self._f_out_chunks[self._site_chunks_index[i]],
supplemental_fp=self._supplemental_fp,
var_meta=self._var_meta,
)
futures[future] = i
logger.info(f'Finished kicking off {len(futures)} futures.')
for i, future in enumerate(as_completed(futures)):
if future.result():
logger.info(
f'{i + 1} out of {len(futures)} futures completed.'
)
else:
logger.warning(f'Future #{i + 1} failed!')
def _run(self):
"""Run in serial or parallel depending on number of chunks."""
if len(self.site_chunks) > 1:
self._run_parallel()
else:
self._run_serial()
[docs]
@classmethod
def tmy(
cls,
nsrdb_base_fp,
years,
out_dir,
fn_out,
tmy_type='tmy',
weights=None,
sites_per_worker=100,
n_nodes=1,
node_index=0,
log=True,
log_level='INFO',
log_file=None,
site_slice=None,
supplemental_fp=None,
var_meta=None,
):
"""Run the TMY. Option for custom weights. Select default weights for
TMY / TDY / TGY with `tmy_type` and `weights = None`"""
if log:
init_logger('nsrdb.tmy', log_level=log_level, log_file=log_file)
if weights is None:
weights = cls.WEIGHTS[tmy_type.upper()]
tmy = cls(
nsrdb_base_fp,
years,
weights,
sites_per_worker=sites_per_worker,
out_dir=out_dir,
fn_out=fn_out,
n_nodes=n_nodes,
node_index=node_index,
site_slice=site_slice,
supplemental_fp=supplemental_fp,
var_meta=var_meta,
)
tmy._run()
[docs]
@classmethod
def collect(
cls,
nsrdb_base_fp,
years,
out_dir,
fn_out,
sites_per_worker=100,
site_slice=None,
supplemental_fp=None,
var_meta=None,
log=True,
log_level='INFO',
log_file=None,
purge_chunks=False,
):
"""Run TMY collection.
Parameters
----------
nsrdb_base_fp : str
Base nsrdb filepath to retrieve annual files from. Must include
a single {} format option for the year. Can include * for an
NSRDB multi file source.
years : iterable
Iterable of years to include in the TMY calculation.
out_dir : str
Directory to dump temporary output files.
fn_out : str
Final output filename.
sites_per_worker : int
Number of sites to run at once (sites per core/worker). Used here
to determine size of chunks to collect. Needs to match the value
used during the initial call to `tmy()`.
site_slice : slice
Sites to consider in this TMY.
supplemental_fp : None | dict
Supplemental data base filepaths including {} for year for
uncommon dataset inputs to the TMY calculation. For example:
{'poa': '/projects/pxs/poa_h5_dir/poa_out_{}.h5'}
var_meta : str
CSV filepath containing meta data for all NSRDB variables.
Defaults to the NSRDB var meta csv in git repo.
log : bool
Whether to write to logger
log_level : str
Log level for log output
log_file : str | None
Optional log file to write log output
purge_chunks : bool
Whether to purge chunks after collection into single file.
"""
if log:
init_logger('nsrdb.tmy', log_level=log_level, log_file=log_file)
tgy = cls(
nsrdb_base_fp,
years,
sites_per_worker=sites_per_worker,
weights={'sum_ghi': 1.0},
out_dir=out_dir,
fn_out=fn_out,
n_nodes=1,
node_index=0,
site_slice=site_slice,
supplemental_fp=supplemental_fp,
var_meta=var_meta,
)
tgy._collect(purge_chunks=purge_chunks)