Source code for nsrdb.gap_fill.mlclouds_fill

"""Cloud Properties filling using phgynn"""

import logging
import os
import shutil
import time
from concurrent.futures import as_completed
from warnings import warn

import numpy as np
import pandas as pd
import psutil
from farms import ICE_TYPES, WATER_TYPES
from mlclouds import MODEL_FPATH
from mlclouds.model.multi_step import MultiCloudsModel
from rex import MultiFileNSRDB
from rex.utilities.execution import SpawnProcessPool

from nsrdb.data_model.variable_factory import VarFactory
from nsrdb.file_handlers.outputs import Outputs
from nsrdb.file_handlers.resource import Resource
from nsrdb.gap_fill.cloud_fill import CloudGapFill

logger = logging.getLogger(__name__)


[docs] class MLCloudsFill: """ Use the MLClouds algorithm with phygnn model to fill missing cloud data """ DEFAULT_MODEL = MODEL_FPATH def __init__( self, h5_source, fill_all=False, model_path=None, var_meta=None ): """ Parameters ---------- h5_source : str Path to directory containing multi-file resource file sets. Available formats: /h5_dir/ /h5_dir/prefix*suffix fill_all : bool Flag to fill all cloud properties for all timesteps where cloud_type is cloudy. model_path : dict | None kwargs for ``MultiCloudsModel.load`` method. Specifies ``cloud_prop_model_path`` for cloud property model and optionally ``cloud_type_model_path`` for a cloud type model. Each value is typically a fpath to a .pkl file with an accompanying .json file in the same directory. None will try to use the default model path from the mlclouds project directory. var_meta : str | pd.DataFrame | None CSV file or dataframe containing meta data for all NSRDB variables. Defaults to the NSRDB var meta csv in git repo. """ self._dset_map = {} self._res_shape = None self._h5_source = h5_source self._fill_all = fill_all self._var_meta = var_meta if model_path is None: model_path = self.DEFAULT_MODEL logger.info( 'Initializing MLCloudsFill with h5_source: {}'.format( self._h5_source ) ) logger.info( 'Initializing MLCloudsFill with model: {}'.format(model_path) ) logger.info( 'MLCloudsFill filling all cloud properties: {}'.format( self._fill_all ) ) self._phygnn_model = MultiCloudsModel.load(**model_path) if self.h5_source is not None: with MultiFileNSRDB(self.h5_source) as res: self._dset_map = res.h5._dset_map self._res_shape = res.shape
[docs] def preflight(self): """Run preflight checks and raise error if datasets are missing""" missing = [] for dset in self._phygnn_model.feature_names: ignore = ('clear', 'ice_cloud', 'water_cloud', 'bad_cloud') if dset not in ignore and dset not in self._dset_map: missing.append(dset) if any(missing): msg = ( 'The following datasets were missing in the h5_source ' 'directory: {}'.format(missing) ) logger.error(msg) raise FileNotFoundError(msg)
@property def phygnn_model(self): """ Pre-trained PhygnnModel instance Returns ------- PhygnnModel """ return self._phygnn_model @property def h5_source(self): """ Path to directory containing multi-file resource file sets. Available formats: /h5_dir/ /h5_dir/prefix*suffix Returns ------- str """ return self._h5_source @property def dset_map(self): """ Mapping of datasets to .h5 files Returns ------- dict """ return self._dset_map
[docs] def parse_feature_data(self, feature_data=None, col_slice=slice(None)): """ Parse raw feature data from .h5 files (will have gaps!) Parameters ---------- feature_data : dict | None Pre-loaded feature data to add to (optional). Keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). Any dsets already in this input won't be re-read. col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. Use slice(None) for no chunking. Returns ------- feature_data : dict Raw feature data with gaps. keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). """ logger.info('Loading feature data.') if feature_data is None: feature_data = {} dsets = ( self._phygnn_model.feature_names + self._phygnn_model.label_names ) with MultiFileNSRDB(self.h5_source) as res: for dset in dsets: if dset not in feature_data and dset in res.dsets: logger.debug('Loading {} data'.format(dset)) feature_data[dset] = res[dset, :, col_slice] mem = psutil.virtual_memory() logger.info( 'Feature data loaded for column slice {}. ' 'Memory utilization is {:.3f} GB out of {:.3f} GB total ' '({:.2f}% used).'.format( col_slice, mem.used / 1e9, mem.total / 1e9, 100 * mem.used / mem.total, ) ) return feature_data
[docs] def init_clean_arrays(self): """Initialize a dict of numpy arrays for clean data.""" arr = np.zeros(self._res_shape, dtype=np.float32) dsets = ( 'cloud_type', 'cld_press_acha', 'cld_opd_dcomp', 'cld_reff_dcomp', ) clean_arrays = {d: arr.copy() for d in dsets} fill_flag_array = arr.copy().astype(np.uint8) return clean_arrays, fill_flag_array
[docs] @staticmethod def clean_array(dset, array): """Clean a dataset array using temporal nearest neighbor interpolation. Parameters ---------- dset : str NSRDB dataset name array : np.ndarray 2D (time x sites) float numpy array of data for dset. Missing values should be set to NaN. Returns ------- array : np.ndarray 2D (time x sites) float numpy array of data for dset. Missing values should be filled """ # attempt to clean sites with mostly NaN values. This has to happen # before the df.interpolate() call. bad = np.isnan(array) any_bad = bad.any() bad_cols = (~bad).sum(axis=0) < 3 if bad.all(): msg = ( f'Feature dataset "{dset}" has all NaN data! Filling with ' 'zeros.' ) logger.warning(msg) warn(msg) array[:] = 0 elif bad_cols.any(): mean_impute = np.nanmean(array, axis=0) count = bad_cols.sum() msg = ( 'Feature dataset "{}" has {} columns with nearly all NaN ' 'values out of {} ({:.2f}%) ({} NaN values out of ' '{} total {:.2f}%). Filling with mean values by site.'.format( dset, count, array.shape[1], 100 * count / array.shape[1], bad.sum(), array.size, 100 * bad.sum() / array.size, ) ) logger.warning(msg) warn(msg) array[:, bad_cols] = mean_impute[bad_cols] # attempt to fill all remaining NaN values that should be scattered # throughout (not full sites missing data) if any_bad: array = ( pd.DataFrame(array) .interpolate('nearest') .ffill() .bfill() .values ) # Fill any persistent NaN values with the global mean (these are # usually sites that have no valid data at all) bad = np.isnan(array) bad_cols = (~bad).sum(axis=0) < 3 if bad.any() or bad_cols.any(): mean_impute = np.nanmean(array) msg = ( 'There were {} observations (out of {}) that have ' 'persistent NaN values and {} sites (out of {}) that ' 'still have all NaN values that could not be ' 'cleaned for {}. Filling with global mean value of {}'.format( bad.sum(), bad.size, bad_cols.sum(), array.shape[1], dset, mean_impute, ) ) logger.warning(msg) warn(msg) array[bad] = mean_impute return array
[docs] def clean_feature_data( self, feature_raw, fill_flag, sza_lim=90, max_workers=1 ): """ Clean feature data Parameters ---------- feature_raw : dict Raw feature data with gaps. keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). fill_flag : np.ndarray Integer array of flags showing what data was filled and why. sza_lim : int, optional Solar zenith angle limit below which missing cloud property data will be gap filled. By default 90 to fill all missing daylight data max_workers : None | int Maximum workers to clean data in parallel. 1 is serial and None uses all available workers. Returns ------- feature_data : dict Clean feature data without gaps. keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). fill_flag : np.ndarray Integer array of flags showing what data was filled and why. """ t0 = time.time() feature_data = feature_raw.copy() day = feature_data['solar_zenith_angle'] < sza_lim cloud_type = feature_data['cloud_type'] assert cloud_type.shape == fill_flag.shape # fill flag 1 and 2 are the missing cloud type flags day_missing_ctype = day & ( np.isin(fill_flag, (1, 2)) | (cloud_type < 0) ) mask = cloud_type < 0 full_missing_ctype_mask = mask.all(axis=0) if any(full_missing_ctype_mask) or mask.any(): msg = ( 'There are {} missing cloud type observations ' 'out of {} including {} with full missing ctype. This ' 'needs to be resolved using the fill_ctype_press() ' 'method before this step.'.format( mask.sum(), mask.size, full_missing_ctype_mask.sum() ) ) logger.error(msg) raise RuntimeError(msg) feature_data['cld_opd_dcomp'] = np.nan_to_num( feature_data['cld_opd_dcomp'], nan=-1.0 ) feature_data['cld_reff_dcomp'] = np.nan_to_num( feature_data['cld_reff_dcomp'], nan=-1.0 ) cloudy = np.isin(cloud_type, ICE_TYPES + WATER_TYPES) day_clouds = day & cloudy day_missing_opd = day_clouds & (feature_data['cld_opd_dcomp'] <= 0) day_missing_reff = day_clouds & (feature_data['cld_reff_dcomp'] <= 0) mask_fill_flag_opd = day_missing_opd & (fill_flag == 0) mask_fill_flag_reff = day_missing_reff & (fill_flag == 0) mask_all_bad_opd = (day_missing_opd | ~day).all(axis=0) & ( fill_flag < 2 ).all(axis=0) mask_all_bad_reff = (day_missing_reff | ~day).all(axis=0) & ( fill_flag < 2 ).all(axis=0) fill_flag[mask_fill_flag_opd] = 7 # fill_flag=7 is mlcloud fill fill_flag[mask_fill_flag_reff] = 7 # fill_flag=7 is mlcloud fill fill_flag[:, mask_all_bad_opd] = 4 fill_flag[:, mask_all_bad_reff] = 4 logger.info( '{:.2f}% of timesteps are daylight'.format( 100 * day.sum() / (day.shape[0] * day.shape[1]) ) ) logger.info( '{:.2f}% of daylight timesteps are cloudy'.format( 100 * day_clouds.sum() / day.sum() ) ) logger.info( '{:.2f}% of daylight timesteps are missing cloud type'.format( 100 * day_missing_ctype.sum() / day.sum() ) ) logger.info( '{:.2f}% of cloudy daylight timesteps are missing cloud ' 'opd'.format(100 * day_missing_opd.sum() / day_clouds.sum()) ) logger.info( '{:.2f}% of cloudy daylight timesteps are missing cloud ' 'reff'.format(100 * day_missing_reff.sum() / day_clouds.sum()) ) mask = feature_data['cld_opd_dcomp'] <= 0 feature_data['cld_opd_dcomp'][mask] = np.nan feature_data['cld_opd_dcomp'][~cloudy] = 0.0 mask = feature_data['cld_reff_dcomp'] <= 0 feature_data['cld_reff_dcomp'][mask] = np.nan feature_data['cld_reff_dcomp'][~cloudy] = 0.0 logger.debug('Column NaN values:') for c, d in feature_data.items(): pnan = 100 * np.isnan(d).sum() / (d.shape[0] * d.shape[1]) logger.debug('\t"{}" has {:.2f}% NaN values'.format(c, pnan)) logger.debug('Interpolating feature data using nearest neighbor.') if max_workers == 1: for c, d in feature_data.items(): if c not in self.phygnn_model.label_names: feature_data[c] = self.clean_array(c, d) else: logger.info( 'Interpolating feature data with {} max workers'.format( max_workers ) ) futures = {} loggers = ['nsrdb', 'rex', 'phygnn'] with SpawnProcessPool( loggers=loggers, max_workers=max_workers ) as exe: for c, d in feature_data.items(): if c not in self.phygnn_model.label_names: future = exe.submit(self.clean_array, c, d) futures[future] = c for future in as_completed(futures): c = futures[future] feature_data[c] = future.result() logger.debug('Feature data interpolation is complete.') assert ~(feature_data['cloud_type'] < 0).any() assert ~any(np.isnan(d).any() for d in feature_data.values()) assert ~(cloudy & (feature_data['cld_opd_dcomp'] <= 0)).any() assert ~(cloudy & (feature_data['cld_reff_dcomp'] <= 0)).any() logger.debug('Adding feature flag') cloud_type = feature_data['cloud_type'] day_ice_clouds = day & np.isin(cloud_type, ICE_TYPES) day_water_clouds = day & np.isin(cloud_type, WATER_TYPES) day_clouds_bad_ctype = ( day_ice_clouds | day_water_clouds ) & day_missing_ctype flag = np.full(day_ice_clouds.shape, 'night', dtype=object) flag[day] = 'clear' flag[day_ice_clouds] = 'ice_cloud' flag[day_water_clouds] = 'water_cloud' flag[day_clouds_bad_ctype] = 'bad_cloud' flag[day_missing_opd] = 'bad_cloud' flag[day_missing_reff] = 'bad_cloud' feature_data['flag'] = flag logger.debug( 'Created the "flag" dataset with the following unique ' 'values: {}'.format(np.unique(flag)) ) mem = psutil.virtual_memory() logger.debug( 'Cleaned feature data dict has these keys: {}'.format( feature_data.keys() ) ) logger.debug( 'Cleaned feature data dict values have these shapes: {}'.format( [d.shape for d in feature_data.values()] ) ) logger.debug( 'Feature flag column has these values: {}'.format( np.unique(feature_data['flag']) ) ) logger.info('Cleaning took {:.1f} seconds'.format(time.time() - t0)) logger.info( 'Memory utilization is ' '{:.3f} GB out of {:.3f} GB total ({:.2f}% used).'.format( mem.used / 1e9, mem.total / 1e9, 100 * mem.used / mem.total ) ) return feature_data, fill_flag
[docs] def archive_cld_properties(self): """ Archive original cloud property (cld_*) .h5 files. This method creates .tmp files in a ./raw/ sub directory. mark_complete_archived_files() should be run at the end to remove the .tmp designation. This will signify that the cloud fill was completed successfully. """ cld_dsets = [ 'cld_opd_dcomp', 'cld_reff_dcomp', 'cld_press_acha', 'cloud_type', ] for dset in [ds for ds in cld_dsets if ds in self.dset_map]: src_fpath = self.dset_map[dset] src_dir, f_name = os.path.split(src_fpath) dst_dir = os.path.join(src_dir, 'raw') if not os.path.exists(dst_dir): os.makedirs(dst_dir) dst_fpath = os.path.join(dst_dir, f_name) dst_fpath_tmp = dst_fpath + '.tmp' logger.debug('Archiving {} to {}'.format(src_fpath, dst_fpath_tmp)) if os.path.exists(dst_fpath): msg = ( 'A raw cloud file already exists, this suggests ' 'MLClouds gap fill has already been run: {}'.format( dst_fpath ) ) logger.error(msg) raise RuntimeError(msg) if os.path.exists(dst_fpath_tmp): # don't overwrite the tmp file, the original may have been # manipulated by a failed mlclouds job. logger.debug( 'Archive file exists, not overwriting: {}'.format( dst_fpath_tmp ) ) else: shutil.copy(src_fpath, dst_fpath_tmp)
[docs] def mark_complete_archived_files(self): """Remove the .tmp marker from the archived files once MLCloudsFill is complete""" cld_dsets = [ 'cld_opd_dcomp', 'cld_reff_dcomp', 'cld_press_acha', 'cloud_type', ] for dset in cld_dsets: fpath = self.dset_map[dset] src_dir, f_name = os.path.split(fpath) raw_path = os.path.join(src_dir, 'raw', f_name) logger.debug('Renaming .tmp raw file to {}'.format(raw_path)) if os.path.exists(raw_path + '.tmp'): os.rename(raw_path + '.tmp', raw_path) else: msg = ( 'Something went wrong. The .tmp file created at the ' 'beginning of MLCloudsFill no longer exists: {}'.format( raw_path + '.tmp' ) ) logger.error(msg) raise FileNotFoundError(msg)
[docs] def predict_cld_properties( self, feature_data, col_slice=None, low_mem=False ): """ Predict cloud properties with phygnn Parameters ---------- feature_data : dict Clean feature data without gaps. keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. Just used for logging in this method. low_mem : bool Option to run predictions in low memory mode. Typically the memory bloat during prediction is: (n_time x n_sites x n_nodes_per_layer). low_mem=True will reduce this to (1000 x n_nodes_per_layer) Returns ------- predicted_data : dict Dictionary of predicted cloud properties. Keys are nsrdb dataset names, values are 2D arrays of phygnn-predicted values (time x sites). """ L = feature_data['flag'].shape[0] * feature_data['flag'].shape[1] cols = [ k for k in feature_data if k in self.phygnn_model.feature_names or k == 'flag' ] feature_df = pd.DataFrame(index=np.arange(L), columns=cols) for dset, arr in feature_data.items(): if dset in feature_df: feature_df[dset] = arr.flatten(order='F') # Predict on night timesteps as if they were clear # the phygnn model wont be trained with the night category # so this is an easy way to keep the full data shape and # cooperate with the feature names that phygnn expects night_mask = feature_df['flag'] == 'night' feature_df.loc[night_mask, 'flag'] = 'clear' logger.debug( 'Predicting gap filled cloud data for column slice {}'.format( col_slice ) ) predict_feats = set(feature_df.columns).intersection( self.phygnn_model.input_feature_names ) predict_feats = feature_df[list(predict_feats)] if not low_mem: labels = self.phygnn_model.predict(predict_feats, table=False) else: len_df = len(predict_feats) chunks = np.array_split( np.arange(len_df), int(np.ceil(len_df / 1000)) ) labels = [] for index_chunk in chunks: sub = predict_feats.iloc[index_chunk] labels.append(self.phygnn_model.predict(sub, table=False)) labels = np.concatenate(labels, axis=0) mem = psutil.virtual_memory() logger.debug( 'Prediction complete for column slice {}. Memory ' 'utilization is {:.3f} GB out of {:.3f} GB total ' '({:.2f}% used).'.format( col_slice, mem.used / 1e9, mem.total / 1e9, 100 * mem.used / mem.total, ) ) logger.debug('Label data shape: {}'.format(labels.shape)) shape = feature_data['flag'].shape predicted_data = {} for i, dset in enumerate(self.phygnn_model.output_names): logger.debug('Reshaping predicted {} to {}'.format(dset, shape)) predicted_data[dset] = labels[:, i].reshape(shape, order='F') for dset, arr in predicted_data.items(): nnan = np.isnan(arr).sum() ntot = arr.shape[0] * arr.shape[1] logger.debug( 'Raw predicted data for {} for column slice {} ' 'has mean: {:.2f}, median: {:.2f}, range: ' '({:.2f}, {:.2f}) and {} NaN values out of ' '{} ({:.2f}%)'.format( dset, col_slice, np.nanmean(arr), np.median(arr), np.nanmin(arr), np.nanmax(arr), nnan, ntot, 100 * nnan / ntot, ) ) return predicted_data
[docs] def fill_bad_cld_properties( self, predicted_data, feature_data, col_slice=None ): """ Fill bad cloud properties in the feature data from predicted data Parameters ---------- predicted_data : dict Dictionary of predicted cloud properties. Keys are nsrdb dataset names, values are 2D arrays of phygnn-predicted values (time x sites). feature_data : dict Clean feature data without gaps. keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. Just used for logging in this method. Returns ------- filled_data : dict Dictionary of filled cloud properties. Keys are nsrdb dataset names, values are 2D arrays of phygnn-predicted values (time x sites). The filled data is a combination of the input predicted_data and feature_data. The datasets in the predicted_data input are used to fill the feature_data input where: (feature_data['flag'] == "bad_cloud") """ fill_mask = feature_data['flag'] == 'bad_cloud' night_mask = feature_data['flag'] == 'night' clear_mask = feature_data['flag'] == 'clear' cloud_mask = (feature_data['flag'] != 'night') & ( feature_data['flag'] != 'clear' ) logger.debug( 'Final fill mask has {} bad clouds, {} night timesteps, ' '{} clear timesteps, {} timesteps that are either night ' 'or clear, and {} cloudy timesteps out of {} ' 'total observations.'.format( fill_mask.sum(), night_mask.sum(), clear_mask.sum(), (clear_mask | night_mask).sum(), cloud_mask.sum(), fill_mask.size, ) ) if fill_mask.sum() == 0: msg = ( 'No "bad_cloud" flags were detected in the feature_data ' '"flag" dataset. Something went wrong! ' 'The cloud data is never perfect...' ) logger.warning(msg) warn(msg) if self._fill_all: logger.debug( 'Filling {} values (all cloudy timesteps) using ' 'MLClouds predictions for column slice {}'.format( np.sum(cloud_mask), col_slice ) ) else: logger.debug( 'Filling {} values using MLClouds predictions for ' 'column slice {}'.format(np.sum(fill_mask), col_slice) ) filled_data = {} for dset, arr in predicted_data.items(): varobj = VarFactory.get_base_handler(dset, var_meta=self._var_meta) arr = np.maximum(arr, 0.01) arr = np.minimum(arr, varobj.physical_max) cld_data = feature_data[dset] if self._fill_all: cld_data[cloud_mask] = arr[cloud_mask] else: cld_data[fill_mask] = arr[fill_mask] cld_data[night_mask | clear_mask] = 0 filled_data[dset] = cld_data nnan = np.isnan(filled_data[dset]).sum() ntot = filled_data[dset].shape[0] * filled_data[dset].shape[1] logger.debug( 'Final cleaned data for {} for column slice {} ' 'has mean: {:.2f}, median: {:.2f}, range: ' '({:.2f}, {:.2f}) and {} NaN values out of ' '{} ({:.2f}%)'.format( dset, col_slice, np.nanmean(filled_data[dset]), np.median(filled_data[dset]), np.nanmin(filled_data[dset]), np.nanmax(filled_data[dset]), nnan, ntot, 100 * nnan / ntot, ) ) if nnan > 0: msg = ( 'Gap filled cloud property "{}" still had ' '{} NaN values!'.format(dset, nnan) ) logger.error(msg) raise RuntimeError(msg) return filled_data
[docs] @staticmethod def fill_ctype_press(h5_source, col_slice=slice(None)): """Fill cloud type and pressure using simple temporal nearest neighbor. Parameters ---------- h5_source : str Path to directory containing multi-file resource file sets. Available formats: /h5_dir/ /h5_dir/prefix*suffix col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. Use slice(None) for no chunking. Returns ------- cloud_type : np.ndarray 2D array (time x sites) of gap-filled cloud type data. cloud_pres : np.ndarray 2D array (time x sites) of gap-filled cld_press_acha data. sza : np.ndarray 2D array (time x sites) of solar zenith angle data. fill_flag : np.ndarray Integer array of flags showing what data was filled and why. """ fill_flag = None with MultiFileNSRDB(h5_source) as f: cloud_type = f['cloud_type', :, col_slice] cloud_pres = f['cld_press_acha', :, col_slice] sza = f['solar_zenith_angle', :, col_slice] cloud_type, fill_flag = CloudGapFill.fill_cloud_type( cloud_type, fill_flag=fill_flag ) cloud_pres, fill_flag = CloudGapFill.fill_cloud_prop( 'cld_press_acha', cloud_pres, cloud_type, sza, fill_flag=fill_flag, cloud_type_is_clean=True, ) return cloud_type, cloud_pres, sza, fill_flag
[docs] def write_filled_data(self, filled_data, col_slice=slice(None)): """Write gap filled cloud data to disk Parameters ---------- filled_data : dict Dictionary of filled cloud properties. Keys are nsrdb dataset names, values are 2D arrays of phygnn-predicted values (time x sites). The filled data is a combination of the input predicted_data and feature_data. The datasets in the predicted_data input are used to fill the feature_data input where: (feature_data['flag'] == "bad_cloud") col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. Use slice(None) for no chunking. """ for dset, arr in filled_data.items(): fpath = self.dset_map[dset] with Outputs(fpath, mode='a') as f: logger.info( 'Writing filled "{}" to: {}'.format( dset, os.path.basename(fpath) ) ) f[dset, :, col_slice] = arr logger.debug('Finished writing "{}".'.format(dset))
[docs] def write_fill_flag(self, fill_flag, col_slice=slice(None)): """Write the fill flag dataset to its daily file next to the cloud property files. Parameters ---------- fill_flag : np.ndarray Integer array of flags showing what data was filled and why. col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. Use slice(None) for no chunking. """ fpath_opd = self._dset_map['cld_opd_dcomp'] fpath = fpath_opd.replace('cld_opd_dcomp', 'cloud_fill_flag') var_obj = VarFactory.get_base_handler( 'cloud_fill_flag', var_meta=self._var_meta ) with Resource(fpath_opd) as res: ti = res.time_index meta = res.meta logger.info( 'Writing cloud_fill_flag to: {}'.format(os.path.basename(fpath)) ) init_dset = False if os.path.exists(fpath): with Outputs(fpath, mode='r') as fout: if 'cloud_fill_flag' not in fout: init_dset = True else: init_dset = True try: if init_dset: with Outputs(fpath, mode='w') as fout: fout.time_index = ti fout.meta = meta init_data = np.zeros( (len(ti), len(meta)), dtype=var_obj.final_dtype ) fout._add_dset( dset_name='cloud_fill_flag', dtype=var_obj.final_dtype, data=init_data, chunks=var_obj.chunks, attrs=var_obj.attrs, ) with Outputs(fpath, mode='a') as fout: fout['cloud_fill_flag', :, col_slice] = fill_flag except Exception as e: msg = 'Could not write col_slice {} to file: "{}"'.format( col_slice, fpath ) logger.exception(msg) raise OSError from e logger.debug('Write complete') logger.info('Final cloud_fill_flag counts:') ntot = fill_flag.shape[0] * fill_flag.shape[1] for n in range(10): count = (fill_flag == n).sum() logger.info( '\tFlag {} has {} counts out of {} ({:.2f}%)'.format( n, count, ntot, 100 * count / ntot ) )
[docs] @classmethod def prep_chunk( cls, h5_source, model_path=None, var_meta=None, sza_lim=90, col_slice=slice(None), ): """Prepare a column chunk (slice) of data for phygnn prediction. Parameters ---------- h5_source : str Path to directory containing multi-file resource file sets. Available formats: /h5_dir/ /h5_dir/prefix*suffix model_path : str | None Directory to load phygnn model from. This is typically a fpath to a .pkl file with an accompanying .json file in the same directory. None will try to use the default model path from the mlclouds project directory. var_meta : str | pd.DataFrame | None CSV file or dataframe containing meta data for all NSRDB variables. Defaults to the NSRDB var meta csv in git repo. sza_lim : int, optional Solar zenith angle limit below which missing cloud property data will be gap filled. By default 90 to fill all missing daylight data col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. Use slice(None) for no chunking. Returns ------- feature_data : dict Clean feature data without gaps. keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). This is just for the col_slice being worked on. clean_data : dict Dictionary of filled cloud properties. Keys are nsrdb dataset names, values are 2D arrays (time x sites) and have nearest-neighbor cleaned values for cloud pressure and type This is just for the col_slice being worked on. fill_flag : np.ndarray Integer array of flags showing what data was filled and why. This is just for the col_slice being worked on. """ obj = cls(h5_source, model_path=model_path, var_meta=var_meta) logger.debug( 'Preparing data for MLCloudsFill for column slice {}'.format( col_slice ) ) ctype, cpres, sza, fill_flag = obj.fill_ctype_press( obj.h5_source, col_slice=col_slice ) clean_data = {'cloud_type': ctype, 'cld_press_acha': cpres} feature_data = { 'cloud_type': ctype, 'cld_press_acha': cpres, 'solar_zenith_angle': sza, } feature_data = obj.parse_feature_data( feature_data=feature_data, col_slice=col_slice ) feature_data, fill_flag = obj.clean_feature_data( feature_data, fill_flag, sza_lim=sza_lim ) logger.debug( 'Completed MLClouds data prep for column slice {}'.format( col_slice ) ) return feature_data, clean_data, fill_flag
[docs] def process_chunk( self, i_features, i_clean, i_flag, col_slice, clean_data, fill_flag, low_mem=False, ): """Use cleaned and prepared data to run phygnn predictions and create final filled data for a single column chunk. Parameters ---------- i_features : dict Clean feature data without gaps. keys are the feature names (nsrdb dataset names), values are 2D numpy arrays (time x sites). This is just for a single column chunk (col_slice). i_clean : dict Dictionary of filled cloud properties. Keys are nsrdb dataset names, values are 2D arrays (time x sites) of phygnn-predicted values (cloud opd and reff) or nearest-neighbor cleaned values (cloud pressure and type). This is just for a single column chunk (col_slice). i_flag : np.ndarray Integer array of flags showing what data was filled and why. This is just for a single column chunk (col_slice). col_slice : slice Column slice of the resource data to work on. This is a result of chunking the columns to reduce memory load. clean_data : dict Dictionary of filled cloud properties. Keys are nsrdb dataset names, values are 2D arrays (time x sites). This is for ALL chunks (full resource shape). fill_flag : np.ndarray Integer array of flags showing what data was filled and why. This is for ALL chunks (full resource shape). low_mem : bool Option to run predictions in low memory mode. Typically the memory bloat during prediction is: (n_time x n_sites x n_nodes_per_layer). low_mem=True will reduce this to (1000 x n_nodes_per_layer) Returns ------- clean_data : dict Dictionary of filled cloud properties. Keys are nsrdb dataset names, values are 2D arrays (time x sites). This has been updated with phygnn-predicted values (cloud opd and reff) or nearest-neighbor cleaned values (cloud pressure and type) This is for ALL chunks (full resource shape). fill_flag : np.ndarray Integer array of flags showing what data was filled and why. This is for ALL chunks (full resource shape). """ i_predicted = self.predict_cld_properties( i_features, col_slice=col_slice, low_mem=low_mem ) i_filled = self.fill_bad_cld_properties( i_predicted, i_features, col_slice=col_slice ) i_clean.update(i_filled) fill_flag[:, col_slice] = i_flag for k, v in clean_data.items(): v[:, col_slice] = i_clean[k] return clean_data, fill_flag
[docs] @classmethod def clean_data_model( cls, data_model, fill_all=False, model_path=None, var_meta=None, sza_lim=90, low_mem=False, ): """Run the MLCloudsFill process on data in-memory in an nsrdb data model object. Parameters ---------- data_model : nsrdb.data_model.DataModel DataModel object with processed source data (cloud data + ancillary processed onto the nsrdb grid). fill_all : bool Flag to fill all cloud properties for all timesteps where cloud_type is cloudy. model_path : str | None Directory to load phygnn model from. This is typically a fpath to a .pkl file with an accompanying .json file in the same directory. None will try to use the default model path from the mlclouds project directory. var_meta : str | pd.DataFrame | None CSV file or dataframe containing meta data for all NSRDB variables. Defaults to the NSRDB var meta csv in git repo. sza_lim : int, optional Solar zenith angle limit below which missing cloud property data will be gap filled. By default 90 to fill all missing daylight data low_mem : bool Option to run predictions in low memory mode. Typically the memory bloat during prediction is: (n_time x n_sites x n_nodes_per_layer). low_mem=True will reduce this to (1000 x n_nodes_per_layer) Returns ------- data_model : nsrdb.data_model.DataModel DataModel object with processed source data (cloud data + ancillary processed onto the nsrdb grid). The cloud property datasets (cloud_type, cld_opd_dcomp, cld_reff_dcomp, cloud_fill_flag) are now cleaned. """ obj = cls( None, fill_all=fill_all, model_path=model_path, var_meta=var_meta ) logger.info('Preparing data for MLCloudsFill to clean data model') ctype = data_model['cloud_type'] sza = data_model['solar_zenith_angle'] ctype, fill_flag = CloudGapFill.fill_cloud_type(ctype, fill_flag=None) feature_data = {'cloud_type': ctype, 'solar_zenith_angle': sza} logger.info('Loading feature data.') dsets = obj._phygnn_model.feature_names + obj._phygnn_model.label_names for dset in dsets: if dset not in feature_data and dset in data_model: feature_data[dset] = data_model[dset] mem = psutil.virtual_memory() logger.info( 'Feature data loaded for data model cleaning. ' 'Memory utilization is {:.3f} GB out of {:.3f} GB total ' '({:.2f}% used).'.format( mem.used / 1e9, mem.total / 1e9, 100 * mem.used / mem.total ) ) feature_data, fill_flag = obj.clean_feature_data( feature_data, fill_flag, sza_lim=sza_lim ) logger.info('Completed MLClouds data prep') predicted = obj.predict_cld_properties(feature_data, low_mem=low_mem) filled = obj.fill_bad_cld_properties(predicted, feature_data) feature_data['cloud_fill_flag'] = fill_flag for k, v in feature_data.items(): logger.info( 'Sending cleaned feature dataset "{}" to data model ' 'with shape {}'.format(k, v.shape) ) data_model[k] = v for k, v in filled.items(): logger.info( 'Sending cleaned cloud property dataset "{}" to ' 'data model with shape {}'.format(k, v.shape) ) data_model[k] = v logger.info('Finished MLClouds fill of data model object.') return data_model
[docs] @classmethod def merra_clouds( cls, h5_source, var_meta=None, merra_fill_flag=8, fill_all=False, model_path=None, ): """Quick check to see if cloud data is from a merra source in which case it should be gap-free and cloud_fill_flag will be written with all 8's Parameters ---------- h5_source : str Path to directory containing multi-file resource file sets. Available formats: /h5_dir/ /h5_dir/prefix*suffix var_meta : str | pd.DataFrame | None CSV file or dataframe containing meta data for all NSRDB variables. Defaults to the NSRDB var meta csv in git repo. merra_fill_flag : int Integer fill flag representing where merra data was used as source cloud data. fill_all : bool Flag to fill all cloud properties for all timesteps where cloud_type is cloudy. model_path : str | None Directory to load phygnn model from. This is typically a fpath to a .pkl file with an accompanying .json file in the same directory. None will try to use the default model path from the mlclouds project directory. Returns ------- is_merra : bool Flag that is True if cloud data is from merra """ mlclouds = cls( h5_source, var_meta=var_meta, model_path=model_path, fill_all=fill_all, ) with MultiFileNSRDB(h5_source) as res: attrs = res.attrs.get('cld_opd_dcomp', {}) if 'merra' in attrs.get('data_source', '').lower(): logger.info( 'Found cloud data from MERRA2 for source: {}'.format(h5_source) ) fill_flag_arr = mlclouds.init_clean_arrays()[1] fill_flag_arr[:] = merra_fill_flag mlclouds.write_fill_flag(fill_flag_arr) return True return False
[docs] @classmethod def run( cls, h5_source, fill_all=False, model_path=None, var_meta=None, sza_lim=90, col_chunk=None, max_workers=None, low_mem=False, ): """ Fill cloud properties using phygnn predictions. Original files will be archived to a new "raw/" sub-directory Parameters ---------- h5_source : str Path to directory containing multi-file resource file sets. Available formats: /h5_dir/ /h5_dir/prefix*suffix fill_all : bool Flag to fill all cloud properties for all timesteps where cloud_type is cloudy. model_path : str | None Directory to load phygnn model from. This is typically a fpath to a .pkl file with an accompanying .json file in the same directory. None will try to use the default model path from the mlclouds project directory. var_meta : str | pd.DataFrame | None CSV file or dataframe containing meta data for all NSRDB variables. Defaults to the NSRDB var meta csv in git repo. sza_lim : int, optional Solar zenith angle limit below which missing cloud property data will be gap filled. By default 90 to fill all missing daylight data col_chunk : None | int Optional chunking method to gap fill one column chunk at a time to reduce memory requirements. If provided, this should be an integer specifying how many columns to work on at one time. max_workers : None | int Maximum workers for running mlclouds in parallel. 1 is serial and None uses all available workers. low_mem : bool Option to run predictions in low memory mode. Typically the memory bloat during prediction is: (n_time x n_sites x n_nodes_per_layer). low_mem=True will reduce this to (1000 x n_nodes_per_layer) """ logger.info( f'Running MLCloudsFill with h5_source: {h5_source}. Running ' f'MLCloudsFill with model: {model_path}. Running MLCloudsFill ' f'with col_chunk: {col_chunk}.' ) obj = cls( h5_source, fill_all=fill_all, model_path=model_path, var_meta=var_meta, ) obj.preflight() obj.archive_cld_properties() clean_data, fill_flag = obj.init_clean_arrays() if col_chunk is None: slices = [slice(None)] logger.info( 'MLClouds gap fill is being run without col_chunk for ' f'full data shape {obj._res_shape} all on one process. If you ' 'see memory errors, try setting the col_chunk input to ' 'distribute the problem across multiple small workers.' ) else: columns = np.arange(obj._res_shape[1]) N = np.ceil(len(columns) / col_chunk) arrays = np.array_split(columns, N) slices = [slice(a[0], 1 + a[-1]) for a in arrays] logger.info( 'MLClouds gap fill will be run across the full data ' f'column shape {len(columns)} in {len(slices)} column chunks ' f'with approximately {col_chunk} sites per chunk.' ) if max_workers == 1: for col_slice in slices: out = obj.prep_chunk( h5_source, model_path=model_path, var_meta=var_meta, sza_lim=sza_lim, col_slice=col_slice, ) i_features, i_clean, i_flag = out out = obj.process_chunk( i_features, i_clean, i_flag, col_slice, clean_data, fill_flag, low_mem=low_mem, ) clean_data, fill_flag = out del i_features, i_clean, i_flag else: futures = {} logger.info( 'Starting process pool for parallel phygnn cloud ' f'fill with {max_workers} workers.' ) loggers = ['nsrdb', 'rex', 'phygnn'] with SpawnProcessPool( loggers=loggers, max_workers=max_workers ) as exe: for col_slice in slices: future = exe.submit( obj.prep_chunk, h5_source, model_path=model_path, var_meta=var_meta, sza_lim=sza_lim, col_slice=col_slice, ) futures[future] = col_slice logger.info(f'Kicked off {len(futures)} futures.') for i, future in enumerate(as_completed(futures)): col_slice = futures[future] i_features, i_clean, i_flag = future.result() out = obj.process_chunk( i_features, i_clean, i_flag, col_slice, clean_data, fill_flag, low_mem=low_mem, ) clean_data, fill_flag = out del i_features, i_clean, i_flag, future mem = psutil.virtual_memory() logger.info( 'MLCloudsFill futures completed: ' '{} out of {}. ' 'Current memory usage is ' '{:.3f} GB out of {:.3f} GB total.'.format( i + 1, len(futures), mem.used / 1e9, mem.total / 1e9, ) ) obj.write_filled_data(clean_data, col_slice=slice(None)) obj.write_fill_flag(fill_flag, col_slice=slice(None)) obj.mark_complete_archived_files() logger.info( 'Completed MLCloudsFill for h5_source: {}'.format(h5_source) )