"""Utilities to calculate the bias correction factors for biased data that is
going to be fed into the sup3r downscaling models. This is typically used to
bias correct GCM data vs. some historical record like the WTK or NSRDB."""

import copy
import json
import logging
import os
from abc import abstractmethod
from concurrent.futures import ProcessPoolExecutor, as_completed

import h5py
import numpy as np
import pandas as pd
import rex
from rex.utilities.fun_utils import get_fun_call_str
from scipy import stats
from scipy.spatial import KDTree

import sup3r.preprocessing.data_handling
from sup3r.preprocessing.data_handling.base import DataHandler
from sup3r.utilities import VERSION_RECORD, ModuleName
from sup3r.utilities.cli import BaseCLI
from sup3r.utilities.utilities import expand_paths
from .mixins import FillAndSmoothMixin

logger = logging.getLogger(__name__)

[docs] class DataRetrievalBase: """Base class to handle data retrieval for the biased data and the baseline data """ def __init__(self, base_fps, bias_fps, base_dset, bias_feature, distance_upper_bound=None, target=None, shape=None, base_handler='Resource', bias_handler='DataHandlerNCforCC', base_handler_kwargs=None, bias_handler_kwargs=None, decimals=None, match_zero_rate=False): """ Parameters ---------- base_fps : list | str One or more baseline .h5 filepaths representing non-biased data to use to correct the biased dataset. This is typically several years of WTK or NSRDB files. bias_fps : list | str One or more biased .nc or .h5 filepaths representing the biased data to be corrected based on the baseline data. This is typically several years of GCM .nc files. base_dset : str A single dataset from the base_fps to retrieve. In the case of wind components, this can be U_100m or V_100m which will retrieve windspeed and winddirection and derive the U/V component. bias_feature : str This is the biased feature from bias_fps to retrieve. This should be a single feature name corresponding to base_dset distance_upper_bound : float Upper bound on the nearest neighbor distance in decimal degrees. This should be the approximate resolution of the low-resolution bias data. None (default) will calculate this based on the median distance between points in bias_fps target : tuple (lat, lon) lower left corner of raster to retrieve from bias_fps. If None then the lower left corner of the full domain will be used. shape : tuple (rows, cols) grid size to retrieve from bias_fps. If None then the full domain shape will be used. base_handler : str Name of rex resource handler or sup3r.preprocessing.data_handling class to be retrieved from the rex/sup3r library. If a sup3r.preprocessing.data_handling class is used, all data will be loaded in this class' initialization and the subsequent bias calculation will be done in serial bias_handler : str Name of the bias data handler class to be retrieved from the sup3r.preprocessing.data_handling library. base_handler_kwargs : dict | None Optional kwargs to send to the initialization of the base_handler class bias_handler_kwargs : dict | None Optional kwargs to send to the initialization of the bias_handler class decimals : int | None Option to round bias and base data to this number of decimals, this gets passed to np.around(). If decimals is negative, it specifies the number of positions to the left of the decimal point. match_zero_rate : bool Option to fix the frequency of zero values in the biased data. The lowest percentile of values in the biased data will be set to zero to match the percentile of zeros in the base data. If SkillAssessment is being run and this is True, the distributions will not be mean-centered. This helps resolve the issue where global climate models produce too many days with small precipitation totals e.g., the "drizzle problem" [Polade2014]_. References ---------- .. [Polade2014] Polade, S. D., Pierce, D. W., Cayan, D. R., Gershunov, A., & Dettineer, M. D. (2014). The key role of dry days in changing regional climate and precipitation regimes. Scientific reports, 4(1), 4364. """'Initializing DataRetrievalBase for base dset "{}" ' 'correcting biased dataset(s): {}'.format( base_dset, bias_feature)) self.base_fps = base_fps self.bias_fps = bias_fps self.base_dset = base_dset self.bias_feature = bias_feature = target self.shape = shape self.decimals = decimals self.base_handler_kwargs = base_handler_kwargs or {} self.bias_handler_kwargs = bias_handler_kwargs or {} self.bad_bias_gids = [] self._distance_upper_bound = distance_upper_bound self.match_zero_rate = match_zero_rate self.base_fps = expand_paths(self.base_fps) self.bias_fps = expand_paths(self.bias_fps) base_sup3r_handler = getattr(sup3r.preprocessing.data_handling, base_handler, None) base_rex_handler = getattr(rex, base_handler, None) if base_rex_handler is not None: self.base_handler = base_rex_handler self.base_dh = self.base_handler(self.base_fps[0], **self.base_handler_kwargs) elif base_sup3r_handler is not None: self.base_handler = base_sup3r_handler self.base_handler_kwargs['features'] = [self.base_dset] self.base_dh = self.base_handler(self.base_fps, **self.base_handler_kwargs) msg = ('Base data handler opened with a sup3r DataHandler class ' 'must load cached data!') assert is not None, msg else: msg = f'Could not retrieve "{base_handler}" from sup3r or rex!' logger.error(msg) raise RuntimeError(msg) self.bias_handler = getattr(sup3r.preprocessing.data_handling, bias_handler) self.base_meta = self.base_dh.meta self.bias_dh = self.bias_handler(self.bias_fps, [self.bias_feature],, shape=self.shape, val_split=0.0, **self.bias_handler_kwargs) lats = self.bias_dh.lat_lon[..., 0].flatten() self.bias_meta = self.bias_dh.meta self.bias_ti = self.bias_dh.time_index raster_shape = self.bias_dh.lat_lon[..., 0].shape bias_lat_lon = self.bias_meta[['latitude', 'longitude']].values self.bias_tree = KDTree(bias_lat_lon) self.bias_gid_raster = np.arange(lats.size) self.bias_gid_raster = self.bias_gid_raster.reshape(raster_shape) self.nn_dist, self.nn_ind = self.bias_tree.query( self.base_meta[['latitude', 'longitude']], k=1, distance_upper_bound=self.distance_upper_bound) self.out = None self._init_out()'Finished initializing DataRetrievalBase.') @abstractmethod def _init_out(self): """Initialize output arrays""" @property def meta(self): """Get a meta data dictionary on how these bias factors were calculated""" meta = {'base_fps': self.base_fps, 'bias_fps': self.bias_fps, 'base_dset': self.base_dset, 'bias_feature': self.bias_feature, 'target':, 'shape': self.shape, 'class': str(self.__class__), 'version_record': VERSION_RECORD} return meta @property def distance_upper_bound(self): """Maximum distance (float) to map high-resolution data from exo_source to the low-resolution file_paths input.""" if self._distance_upper_bound is None: diff = np.diff(self.bias_meta[['latitude', 'longitude']].values, axis=0) diff = np.max(np.median(diff, axis=0)) self._distance_upper_bound = diff'Set distance upper bound to {:.4f}' .format(self._distance_upper_bound)) return self._distance_upper_bound
[docs] @staticmethod def compare_dists(base_data, bias_data, adder=0, scalar=1): """Compare two distributions using the two-sample Kolmogorov-Smirnov. When the output is minimized, the two distributions are similar. Parameters ---------- base_data : np.ndarray 1D array of base data observations. bias_data : np.ndarray 1D array of biased data observations. adder : float Factor to adjust the biased data before comparing distributions: bias_data * scalar + adder scalar : float Factor to adjust the biased data before comparing distributions: bias_data * scalar + adder Returns ------- out : float KS test statistic """ out = stats.ks_2samp(base_data, bias_data * scalar + adder) return out.statistic
[docs] @classmethod def get_node_cmd(cls, config): """Get a CLI call to call on a single node based on an input config. Parameters ---------- config : dict sup3r bias calc config with all necessary args and kwargs to initialize the class and call run() on a single node. """ import_str = 'import time;\n' import_str += 'from gaps import Status;\n' import_str += 'from rex import init_logger;\n' import_str += f'from sup3r.bias import {cls.__name__};\n' if not hasattr(cls, 'run'): msg = ('I can only get you a node command for subclasses of ' 'DataRetrievalBase with a run() method.') logger.error(msg) raise NotImplementedError(msg) # pylint: disable=E1101 init_str = get_fun_call_str(cls, config) fun_str = get_fun_call_str(, config) fun_str = fun_str.partition('.')[-1] fun_str = 'bc.' + fun_str log_file = config.get('log_file', None) log_level = config.get('log_level', 'INFO') log_arg_str = f'"sup3r", log_level="{log_level}"' if log_file is not None: log_arg_str += f', log_file="{log_file}"' cmd = (f"python -c \'{import_str}\n" "t0 = time.time();\n" f"logger = init_logger({log_arg_str});\n" f"bc = {init_str};\n" f"{fun_str};\n" "t_elap = time.time() - t0;\n") pipeline_step = config.get('pipeline_step') or ModuleName.BIAS_CALC cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/')
[docs] def get_bias_gid(self, coord): """Get the bias gid from a coordinate. Parameters ---------- coord : tuple (lat, lon) to get data for. Returns ------- bias_gid : int gid of the data to retrieve in the bias data source raster data. The gids for this data source are the enumerated indices of the flattened coordinate array. d : float Distance in decimal degrees from coord to bias gid """ d, i = self.bias_tree.query(coord) bias_gid = self.bias_gid_raster.flatten()[i] return bias_gid, d
[docs] def get_base_gid(self, bias_gid): """Get one or more base gid(s) corresponding to a bias gid. Parameters ---------- bias_gid : int gid of the data to retrieve in the bias data source raster data. The gids for this data source are the enumerated indices of the flattened coordinate array. Returns ------- dist : np.ndarray Array of nearest neighbor distances with length equal to the number of high-resolution baseline gids that map to the low resolution bias gid pixel. base_gid : np.ndarray Array of base gids that are the nearest neighbors of bias_gid with length equal to the number of high-resolution baseline gids that map to the low resolution bias gid pixel. """ base_gid = np.where(self.nn_ind == bias_gid)[0] dist = self.nn_dist[base_gid] return dist, base_gid
[docs] def get_data_pair(self, coord, daily_reduction='avg'): """Get base and bias data observations based on a single bias gid. Parameters ---------- coord : tuple (lat, lon) to get data for. daily_reduction : None | str Option to do a reduction of the hourly+ source base data to daily data. Can be None (no reduction, keep source time frequency), "avg" (daily average), "max" (daily max), "min" (daily min), "sum" (daily sum/total) Returns ------- base_data : np.ndarray 1D array of base data spatially averaged across the base_gid input and possibly daily-averaged or min/max'd as well. bias_data : np.ndarray 1D array of temporal data at the requested gid. base_dist : np.ndarray Array of nearest neighbor distances from coord to the base data sites with length equal to the number of high-resolution baseline gids that map to the low resolution bias gid pixel. bias_dist : Float Nearest neighbor distance from coord to the bias data site """ bias_gid, bias_dist = self.get_bias_gid(coord) base_dist, base_gid = self.get_base_gid(bias_gid) bias_data = self.get_bias_data(bias_gid) base_data = self.get_base_data(self.base_fps, self.base_dset, base_gid, self.base_handler, daily_reduction=daily_reduction, decimals=self.decimals) base_data = base_data[0] return base_data, bias_data, base_dist, bias_dist
[docs] def get_bias_data(self, bias_gid, bias_dh=None): """Get data from the biased data source for a single gid Parameters ---------- bias_gid : int gid of the data to retrieve in the bias data source raster data. The gids for this data source are the enumerated indices of the flattened coordinate array. bias_dh : DataHandler, default=self.bias_dh Any ``DataHandler`` from :mod:`sup3r.preprocessing.data_handling`. This optional argument allows an alternative handler other than the usual :attr:`bias_dh`. For instance, the derived :class:`~qdm.QuantileDeltaMappingCorrection` uses it to access the reference biased dataset as well as the target biased dataset. Returns ------- bias_data : np.ndarray 1D array of temporal data at the requested gid. """ idx = np.where(self.bias_gid_raster == bias_gid) # This can be confusing. If the given argument `bias_dh` is None, # the default value for dh is `self.bias_dh`. dh = bias_dh or self.bias_dh # But the `data` attribute from the handler `dh` can also be None, # and in that case, `load_cached_data()`. if is None: dh.load_cached_data() bias_data =[idx][0] if bias_data.shape[-1] == 1: bias_data = bias_data[:, 0] else: msg = ('Found a weird number of feature channels for the bias ' 'data retrieval: {}. Need just one channel'.format( bias_data.shape)) logger.error(msg) raise RuntimeError(msg) if self.decimals is not None: bias_data = np.around(bias_data, decimals=self.decimals) return bias_data
[docs] @classmethod def get_base_data(cls, base_fps, base_dset, base_gid, base_handler, base_handler_kwargs=None, daily_reduction='avg', decimals=None, base_dh_inst=None): """Get data from the baseline data source, possibly for many high-res base gids corresponding to a single coarse low-res bias gid. Parameters ---------- base_fps : list | str One or more baseline .h5 filepaths representing non-biased data to use to correct the biased dataset. This is typically several years of WTK or NSRDB files. base_dset : str A single dataset from the base_fps to retrieve. base_gid : int | np.ndarray One or more spatial gids to retrieve from base_fps. The data will be spatially averaged across all of these sites. base_handler : rex.Resource A rex data handler similar to rex.Resource or sup3r.DataHandler classes (if using the latter, must also input base_dh_inst) base_handler_kwargs : dict | None Optional kwargs to send to the initialization of the base_handler class daily_reduction : None | str Option to do a reduction of the hourly+ source base data to daily data. Can be None (no reduction, keep source time frequency), "avg" (daily average), "max" (daily max), "min" (daily min), "sum" (daily sum/total) decimals : int | None Option to round bias and base data to this number of decimals, this gets passed to np.around(). If decimals is negative, it specifies the number of positions to the left of the decimal point. base_dh_inst : sup3r.DataHandler Instantiated DataHandler class that has already loaded the base data (required if base files are .nc and are not being opened by a rex Resource handler). Returns ------- out_data : np.ndarray 1D array of base data spatially averaged across the base_gid input and possibly daily-averaged or min/max'd as well. out_ti : pd.DatetimeIndex DatetimeIndex object of datetimes corresponding to the output data. """ out_data = [] out_ti = [] all_cs_ghi = [] base_handler_kwargs = base_handler_kwargs or {} if issubclass(base_handler, DataHandler) and base_dh_inst is None: msg = ('The method `get_base_data()` is only to be used with ' '`base_handler` as a `sup3r.DataHandler` subclass if ' '`base_dh_inst` is also provided!') logger.error(msg) raise RuntimeError(msg) if issubclass(base_handler, DataHandler) and base_dh_inst is not None: out_ti = base_dh_inst.time_index out_data = cls._read_base_sup3r_data(base_dh_inst, base_dset, base_gid) all_cs_ghi = np.ones(len(out_data), dtype=np.float32) * np.nan else: for fp in base_fps: with base_handler(fp, **base_handler_kwargs) as res: base_ti = res.time_index temp_out = cls._read_base_rex_data(res, base_dset, base_gid) base_data, base_cs_ghi = temp_out out_data.append(base_data) out_ti.append(base_ti) all_cs_ghi.append(base_cs_ghi) out_data = np.hstack(out_data) out_ti = pd.DatetimeIndex(np.hstack(out_ti)) all_cs_ghi = np.hstack(all_cs_ghi) if daily_reduction is not None: out_data, out_ti = cls._reduce_base_data(out_ti, out_data, all_cs_ghi, base_dset, daily_reduction) if decimals is not None: out_data = np.around(out_data, decimals=decimals) return out_data, out_ti
@staticmethod def _match_zero_rate(bias_data, base_data): """The lowest percentile of values in the biased data will be set to zero to match the percentile of zeros in the base data. This helps resolve the issue where global climate models produce too many days with small precipitation totals e.g., the "drizzle problem". Ref: Polade et al., 2014 Parameters ---------- bias_data : np.ndarray 1D array of biased data observations. base_data : np.ndarray 1D array of base data observations. Returns ------- bias_data : np.ndarray 1D array of biased data observations. Values below the quantile associated with zeros in base_data will be set to zero """ q_zero_base_in = np.nanmean(base_data == 0) q_zero_bias_in = np.nanmean(bias_data == 0) q_bias = np.linspace(0, 1, len(bias_data)) min_value_bias = np.interp(q_zero_base_in, q_bias, sorted(bias_data)) bias_data[bias_data < min_value_bias] = 0 q_zero_base_out = np.nanmean(base_data == 0) q_zero_bias_out = np.nanmean(bias_data == 0) logger.debug('Input bias/base zero rate is {:.3e}/{:.3e}, ' 'output is {:.3e}/{:.3e}' .format(q_zero_bias_in, q_zero_base_in, q_zero_bias_out, q_zero_base_out)) return bias_data @staticmethod def _read_base_sup3r_data(dh, base_dset, base_gid): """Read baseline data from a sup3r DataHandler Parameters ---------- dh : sup3r.DataHandler sup3r DataHandler that is an open file handler of the base file(s) base_dset : str A single dataset from the base_fps to retrieve. base_gid : int | np.ndarray One or more spatial gids to retrieve from base_fps. The data will be spatially averaged across all of these sites. Returns ------- base_data : np.ndarray 1D array of base data spatially averaged across the base_gid input """ idf = dh.features.index(base_dset) gid_raster = np.arange(len(dh.meta)) gid_raster = gid_raster.reshape(dh.shape[:2]) idy, idx = np.where(np.isin(gid_raster, base_gid)) base_data =[idy, idx, :, idf] assert base_data.shape[0] == len(base_gid) assert base_data.shape[1] == len(dh.time_index) return base_data.mean(axis=0) @staticmethod def _read_base_rex_data(res, base_dset, base_gid): """Read baseline data from a rex resource handler with extra logic for special datasets (e.g. u/v wind components or clearsky_ratio) Parameters ---------- res : rex.Resource rex Resource handler that is an open file handler of the base file(s) base_dset : str A single dataset from the base_fps to retrieve. base_gid : int | np.ndarray One or more spatial gids to retrieve from base_fps. The data will be spatially averaged across all of these sites. Returns ------- base_data : np.ndarray 1D array of base data spatially averaged across the base_gid input base_cs_ghi : np.ndarray If base_dset == "clearsky_ratio", the base_data array is GHI and this base_cs_ghi is clearsky GHI. Otherwise this is an array with same length as base_data but full of np.nan """ msg = '`res` input must not be a `DataHandler` subclass!' assert not issubclass(res.__class__, DataHandler), msg base_cs_ghi = None if base_dset.startswith(('U_', 'V_')): dset_ws = base_dset.replace('U_', 'windspeed_') dset_ws = dset_ws.replace('V_', 'windspeed_') dset_wd = dset_ws.replace('speed', 'direction') base_ws = res[dset_ws, :, base_gid] base_wd = res[dset_wd, :, base_gid] if base_dset.startswith('U_'): base_data = -base_ws * np.sin(np.radians(base_wd)) else: base_data = -base_ws * np.cos(np.radians(base_wd)) elif base_dset == 'clearsky_ratio': base_data = res['ghi', :, base_gid] base_cs_ghi = res['clearsky_ghi', :, base_gid] else: base_data = res[base_dset, :, base_gid] if len(base_data.shape) == 2: base_data = np.nanmean(base_data, axis=1) if base_cs_ghi is not None: base_cs_ghi = np.nanmean(base_cs_ghi, axis=1) if base_cs_ghi is None: base_cs_ghi = np.ones(len(base_data), dtype=np.float32) * np.nan return base_data, base_cs_ghi @staticmethod def _reduce_base_data(base_ti, base_data, base_cs_ghi, base_dset, daily_reduction): """Reduce the base timeseries data using some sort of daily reduction function. Parameters ---------- base_ti : pd.DatetimeIndex Time index associated with base_data base_data : np.ndarray 1D array of base data spatially averaged across the base_gid input base_cs_ghi : np.ndarray If base_dset == "clearsky_ratio", the base_data array is GHI and this base_cs_ghi is clearsky GHI. Otherwise this is an array with same length as base_data but full of np.nan base_dset : str A single dataset from the base_fps to retrieve. daily_reduction : str Option to do a reduction of the hourly+ source base data to daily data. Can be None (no reduction, keep source time frequency), "avg" (daily average), "max" (daily max), "min" (daily min), "sum" (daily sum/total) Returns ------- base_data : np.ndarray 1D array of base data spatially averaged across the base_gid input and possibly daily-averaged or min/max'd as well. daily_ti : pd.DatetimeIndex Daily DatetimeIndex corresponding to the daily base_data """ if daily_reduction is None: return base_data daily_ti = pd.DatetimeIndex(sorted(set( df = pd.DataFrame({'date':, 'base_data': base_data, 'base_cs_ghi': base_cs_ghi}) cs_ratio = (daily_reduction.lower() in ('avg', 'average', 'mean') and base_dset == 'clearsky_ratio') if cs_ratio: daily_ghi = df.groupby('date').sum()['base_data'].values daily_cs_ghi = df.groupby('date').sum()['base_cs_ghi'].values base_data = daily_ghi / daily_cs_ghi msg = ('Could not calculate daily average "clearsky_ratio" with ' 'base_data and base_cs_ghi inputs: \n{}, \n{}' .format(base_data, base_cs_ghi)) assert not np.isnan(base_data).any(), msg elif daily_reduction.lower() in ('avg', 'average', 'mean'): base_data = df.groupby('date').mean()['base_data'].values elif daily_reduction.lower() in ('max', 'maximum'): base_data = df.groupby('date').max()['base_data'].values elif daily_reduction.lower() in ('min', 'minimum'): base_data = df.groupby('date').min()['base_data'].values elif daily_reduction.lower() in ('sum', 'total'): base_data = df.groupby('date').sum()['base_data'].values msg = (f'Daily reduced base data shape {base_data.shape} does not ' f'match daily time index shape {daily_ti.shape}, ' 'something went wrong!') assert len(base_data.shape) == 1, msg assert base_data.shape == daily_ti.shape, msg return base_data, daily_ti
[docs] class LinearCorrection(FillAndSmoothMixin, DataRetrievalBase): """Calculate linear correction *scalar +adder factors to bias correct data This calculation operates on single bias sites for the full time series of available data (no season bias correction) """ NT = 1 """size of the time dimension, 1 is no time-based bias correction""" def _init_out(self): """Initialize output arrays""" keys = [f'{self.bias_feature}_scalar', f'{self.bias_feature}_adder', f'bias_{self.bias_feature}_mean', f'bias_{self.bias_feature}_std', f'base_{self.base_dset}_mean', f'base_{self.base_dset}_std', ] self.out = {k: np.full((*self.bias_gid_raster.shape, self.NT), np.nan, np.float32) for k in keys}
[docs] @staticmethod def get_linear_correction(bias_data, base_data, bias_feature, base_dset): """Get the linear correction factors based on 1D bias and base datasets Parameters ---------- bias_data : np.ndarray 1D array of biased data observations. base_data : np.ndarray 1D array of base data observations. bias_feature : str This is the biased feature from bias_fps to retrieve. This should be a single feature name corresponding to base_dset base_dset : str A single dataset from the base_fps to retrieve. In the case of wind components, this can be U_100m or V_100m which will retrieve windspeed and winddirection and derive the U/V component. Returns ------- out : dict Dictionary of values defining the mean/std of the bias + base data and the scalar + adder factors to correct the biased data like: bias_data * scalar + adder """ bias_std = np.nanstd(bias_data) if bias_std == 0: bias_std = np.nanstd(base_data) scalar = np.nanstd(base_data) / bias_std adder = np.nanmean(base_data) - np.nanmean(bias_data) * scalar out = { f'bias_{bias_feature}_mean': np.nanmean(bias_data), f'bias_{bias_feature}_std': bias_std, f'base_{base_dset}_mean': np.nanmean(base_data), f'base_{base_dset}_std': np.nanstd(base_data), f'{bias_feature}_scalar': scalar, f'{bias_feature}_adder': adder, } return out
# pylint: disable=W0613 @classmethod def _run_single(cls, bias_data, base_fps, bias_feature, base_dset, base_gid, base_handler, daily_reduction, bias_ti, decimals, base_dh_inst=None, match_zero_rate=False): """Find the nominal scalar + adder combination to bias correct data at a single site""" base_data, _ = cls.get_base_data(base_fps, base_dset, base_gid, base_handler, daily_reduction=daily_reduction, decimals=decimals, base_dh_inst=base_dh_inst) if match_zero_rate: bias_data = cls._match_zero_rate(bias_data, base_data) out = cls.get_linear_correction(bias_data, base_data, bias_feature, base_dset) return out
[docs] def write_outputs(self, fp_out, out): """Write outputs to an .h5 file. Parameters ---------- fp_out : str | None Optional .h5 output file to write scalar and adder arrays. out : dict Dictionary of values defining the mean/std of the bias + base data and the scalar + adder factors to correct the biased data like: bias_data * scalar + adder. Each value is of shape (lat, lon, time). """ if fp_out is not None: if not os.path.exists(os.path.dirname(fp_out)): os.makedirs(os.path.dirname(fp_out), exist_ok=True) with h5py.File(fp_out, 'w') as f: # pylint: disable=E1136 lat = self.bias_dh.lat_lon[..., 0] lon = self.bias_dh.lat_lon[..., 1] f.create_dataset('latitude', data=lat) f.create_dataset('longitude', data=lon) for dset, data in out.items(): f.create_dataset(dset, data=data) for k, v in self.meta.items(): f.attrs[k] = json.dumps(v) 'Wrote scalar adder factors to file: {}'.format(fp_out))
[docs] def run(self, fp_out=None, max_workers=None, daily_reduction='avg', fill_extend=True, smooth_extend=0, smooth_interior=0): """Run linear correction factor calculations for every site in the bias dataset Parameters ---------- fp_out : str | None Optional .h5 output file to write scalar and adder arrays. max_workers : int Number of workers to run in parallel. 1 is serial and None is all available. daily_reduction : None | str Option to do a reduction of the hourly+ source base data to daily data. Can be None (no reduction, keep source time frequency), "avg" (daily average), "max" (daily max), "min" (daily min), "sum" (daily sum/total) fill_extend : bool Flag to fill data past distance_upper_bound using spatial nearest neighbor. If False, the extended domain will be left as NaN. smooth_extend : float Option to smooth the scalar/adder data outside of the spatial domain set by the distance_upper_bound input. This alleviates the weird seams far from the domain of interest. This value is the standard deviation for the gaussian_filter kernel smooth_interior : float Option to smooth the scalar/adder data within the valid spatial domain. This can reduce the affect of extreme values within aggregations over large number of pixels. Returns ------- out : dict Dictionary of values defining the mean/std of the bias + base data and the scalar + adder factors to correct the biased data like: bias_data * scalar + adder. Each value is of shape (lat, lon, time). """ logger.debug('Starting linear correction calculation...')'Initialized scalar / adder with shape: {}' .format(self.bias_gid_raster.shape)) self.bad_bias_gids = [] # sup3r DataHandler opening base files will load all data in parallel # during the init and should not be passed in parallel to workers if isinstance(self.base_dh, DataHandler): max_workers = 1 if max_workers == 1: logger.debug('Running serial calculation.') for i, bias_gid in enumerate(self.bias_meta.index): raster_loc = np.where(self.bias_gid_raster == bias_gid) _, base_gid = self.get_base_gid(bias_gid) if not base_gid.any(): self.bad_bias_gids.append(bias_gid) else: bias_data = self.get_bias_data(bias_gid) single_out = self._run_single( bias_data, self.base_fps, self.bias_feature, self.base_dset, base_gid, self.base_handler, daily_reduction, self.bias_ti, self.decimals, base_dh_inst=self.base_dh, match_zero_rate=self.match_zero_rate, ) for key, arr in single_out.items(): self.out[key][raster_loc] = arr'Completed bias calculations for {} out of {} ' 'sites'.format(i + 1, len(self.bias_meta))) else: logger.debug( 'Running parallel calculation with {} workers.'.format( max_workers)) with ProcessPoolExecutor(max_workers=max_workers) as exe: futures = {} for bias_gid in self.bias_meta.index: raster_loc = np.where(self.bias_gid_raster == bias_gid) _, base_gid = self.get_base_gid(bias_gid) if not base_gid.any(): self.bad_bias_gids.append(bias_gid) else: bias_data = self.get_bias_data(bias_gid) future = exe.submit( self._run_single, bias_data, self.base_fps, self.bias_feature, self.base_dset, base_gid, self.base_handler, daily_reduction, self.bias_ti, self.decimals, match_zero_rate=self.match_zero_rate, ) futures[future] = raster_loc logger.debug('Finished launching futures.') for i, future in enumerate(as_completed(futures)): raster_loc = futures[future] single_out = future.result() for key, arr in single_out.items(): self.out[key][raster_loc] = arr'Completed bias calculations for {} out of {} ' 'sites'.format(i + 1, len(futures)))'Finished calculating bias correction factors.') self.out = self.fill_and_smooth(self.out, fill_extend, smooth_extend, smooth_interior) self.write_outputs(fp_out, self.out) return copy.deepcopy(self.out)
[docs] class MonthlyLinearCorrection(LinearCorrection): """Calculate linear correction *scalar +adder factors to bias correct data This calculation operates on single bias sites on a monthly basis """ NT = 12 """size of the time dimension, 12 is monthly bias correction""" @classmethod def _run_single(cls, bias_data, base_fps, bias_feature, base_dset, base_gid, base_handler, daily_reduction, bias_ti, decimals, base_dh_inst=None, match_zero_rate=False): """Find the nominal scalar + adder combination to bias correct data at a single site""" base_data, base_ti = cls.get_base_data(base_fps, base_dset, base_gid, base_handler, daily_reduction=daily_reduction, decimals=decimals, base_dh_inst=base_dh_inst) if match_zero_rate: bias_data = cls._match_zero_rate(bias_data, base_data) base_arr = np.full(cls.NT, np.nan, dtype=np.float32) out = {} for month in range(1, 13): bias_mask = bias_ti.month == month base_mask = base_ti.month == month if any(bias_mask) and any(base_mask): mout = cls.get_linear_correction(bias_data[bias_mask], base_data[base_mask], bias_feature, base_dset) for k, v in mout.items(): if k not in out: out[k] = base_arr.copy() out[k][month - 1] = v return out
[docs] class MonthlyScalarCorrection(MonthlyLinearCorrection): """Calculate linear correction *scalar factors to bias correct data. This typically used when base data is just monthly means and standard deviations cannot be computed. This is case for vortex data, for example. Thus, just scalar factors are computed as mean(base_data) / mean(bias_data). Adder factors are still written but are exactly zero. This calculation operates on single bias sites on a monthly basis """
[docs] @staticmethod def get_linear_correction(bias_data, base_data, bias_feature, base_dset): """Get the linear correction factors based on 1D bias and base datasets Parameters ---------- bias_data : np.ndarray 1D array of biased data observations. base_data : np.ndarray 1D array of base data observations. bias_feature : str This is the biased feature from bias_fps to retrieve. This should be a single feature name corresponding to base_dset base_dset : str A single dataset from the base_fps to retrieve. In the case of wind components, this can be U_100m or V_100m which will retrieve windspeed and winddirection and derive the U/V component. Returns ------- out : dict Dictionary of values defining the mean/std of the bias + base data and the scalar + adder factors to correct the biased data like: bias_data * scalar + adder """ bias_std = np.nanstd(bias_data) if bias_std == 0: bias_std = np.nanstd(base_data) scalar = np.nanmean(base_data) / np.nanmean(bias_data) adder = np.zeros(scalar.shape) out = { f'bias_{bias_feature}_mean': np.nanmean(bias_data), f'bias_{bias_feature}_std': bias_std, f'base_{base_dset}_mean': np.nanmean(base_data), f'base_{base_dset}_std': np.nanstd(base_data), f'{bias_feature}_scalar': scalar, f'{bias_feature}_adder': adder, } return out
[docs] class SkillAssessment(MonthlyLinearCorrection): """Calculate historical skill of one dataset compared to another.""" PERCENTILES = (1, 5, 25, 50, 75, 95, 99) """Data percentiles to report.""" def _init_out(self): """Initialize output arrays""" monthly_keys = [f'{self.bias_feature}_scalar', f'{self.bias_feature}_adder', f'bias_{self.bias_feature}_mean_monthly', f'bias_{self.bias_feature}_std_monthly', f'base_{self.base_dset}_mean_monthly', f'base_{self.base_dset}_std_monthly', ] annual_keys = [f'{self.bias_feature}_ks_stat', f'{self.bias_feature}_ks_p', f'{self.bias_feature}_bias', f'bias_{self.bias_feature}_mean', f'bias_{self.bias_feature}_std', f'bias_{self.bias_feature}_skew', f'bias_{self.bias_feature}_kurtosis', f'bias_{self.bias_feature}_zero_rate', f'base_{self.base_dset}_mean', f'base_{self.base_dset}_std', f'base_{self.base_dset}_skew', f'base_{self.base_dset}_kurtosis', f'base_{self.base_dset}_zero_rate', ] self.out = {k: np.full((*self.bias_gid_raster.shape, self.NT), np.nan, np.float32) for k in monthly_keys} arr = np.full((*self.bias_gid_raster.shape, 1), np.nan, np.float32) for k in annual_keys: self.out[k] = arr.copy() for p in self.PERCENTILES: base_k = f'base_{self.base_dset}_percentile_{p}' bias_k = f'bias_{self.bias_feature}_percentile_{p}' self.out[base_k] = arr.copy() self.out[bias_k] = arr.copy() @classmethod def _run_skill_eval(cls, bias_data, base_data, bias_feature, base_dset, match_zero_rate=False): """Run skill assessment metrics on 1D datasets at a single site. Note we run the KS test on the mean=0 distributions as per: S. Brands et al. 2013 """ if match_zero_rate: bias_data = cls._match_zero_rate(bias_data, base_data) out = {} bias_mean = np.nanmean(bias_data) base_mean = np.nanmean(base_data) out[f'{bias_feature}_bias'] = bias_mean - base_mean out[f'bias_{bias_feature}_mean'] = bias_mean out[f'bias_{bias_feature}_std'] = np.nanstd(bias_data) out[f'bias_{bias_feature}_skew'] = stats.skew(bias_data) out[f'bias_{bias_feature}_kurtosis'] = stats.kurtosis(bias_data) out[f'bias_{bias_feature}_zero_rate'] = np.nanmean(bias_data == 0) out[f'base_{base_dset}_mean'] = base_mean out[f'base_{base_dset}_std'] = np.nanstd(base_data) out[f'base_{base_dset}_skew'] = stats.skew(base_data) out[f'base_{base_dset}_kurtosis'] = stats.kurtosis(base_data) out[f'base_{base_dset}_zero_rate'] = np.nanmean(base_data == 0) if match_zero_rate: ks_out = stats.ks_2samp(base_data, bias_data) else: ks_out = stats.ks_2samp(base_data - base_mean, bias_data - bias_mean) out[f'{bias_feature}_ks_stat'] = ks_out.statistic out[f'{bias_feature}_ks_p'] = ks_out.pvalue for p in cls.PERCENTILES: base_k = f'base_{base_dset}_percentile_{p}' bias_k = f'bias_{bias_feature}_percentile_{p}' out[base_k] = np.percentile(base_data, p) out[bias_k] = np.percentile(bias_data, p) return out @classmethod def _run_single(cls, bias_data, base_fps, bias_feature, base_dset, base_gid, base_handler, daily_reduction, bias_ti, decimals, base_dh_inst=None, match_zero_rate=False): """Do a skill assessment at a single site""" base_data, base_ti = cls.get_base_data(base_fps, base_dset, base_gid, base_handler, daily_reduction=daily_reduction, decimals=decimals, base_dh_inst=base_dh_inst) arr = np.full(cls.NT, np.nan, dtype=np.float32) out = {f'bias_{bias_feature}_mean_monthly': arr.copy(), f'bias_{bias_feature}_std_monthly': arr.copy(), f'base_{base_dset}_mean_monthly': arr.copy(), f'base_{base_dset}_std_monthly': arr.copy(), } out.update(cls._run_skill_eval(bias_data, base_data, bias_feature, base_dset, match_zero_rate=match_zero_rate)) for month in range(1, 13): bias_mask = bias_ti.month == month base_mask = base_ti.month == month if any(bias_mask) and any(base_mask): mout = cls.get_linear_correction(bias_data[bias_mask], base_data[base_mask], bias_feature, base_dset) for k, v in mout.items(): if not k.endswith(('_scalar', '_adder')): k += '_monthly' out[k][month - 1] = v return out