Source code for rex.utilities.bc_utils

# -*- coding: utf-8 -*-
"""
rex bias correction utilities.
"""
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
import numpy as np
import scipy

logger = logging.getLogger(__name__)


[docs] def sample_q_linear(n_samples): """Sample quantiles from 0 to 1 inclusive linearly with even spacing Parameters ---------- n_samples : int Number of points to sample between 0 and 1 Returns ------- quantiles : np.ndarray 1D array of evenly spaced samples from 0 to 1 """ quantiles = np.linspace(0, 1, n_samples) return quantiles
[docs] def sample_q_log(n_samples, log_base): """Sample quantiles from 0 to 1 while concentrating samples near quantile=0 Parameters ---------- n_samples : int Number of points to sample between 0 and 1 log_base : int | float Log base value. A higher value will concentrate more samples at the extreme sides of the distribution. Returns ------- quantiles : np.ndarray 1D array of log-spaced samples from 0 to 1 """ quantiles = np.logspace(0, 1, n_samples, base=log_base) quantiles = (quantiles - 1) / (log_base - 1) return quantiles
[docs] def sample_q_invlog(n_samples, log_base): """Sample quantiles from 0 to 1 while concentrating samples near quantile=1 Parameters ---------- n_samples : int Number of points to sample between 0 and 1 log_base : int | float Log base value. A higher value will concentrate more samples at the extreme sides of the distribution. Returns ------- quantiles : np.ndarray 1D array of log-spaced samples from 0 to 1 """ quantiles = np.logspace(0, 1, n_samples, base=log_base) quantiles = (quantiles - 1) / (log_base - 1) quantiles = np.array(sorted(1 - quantiles)) return quantiles
[docs] class QuantileDeltaMapping: """Class for quantile delta mapping based on the method from Cannon et al., 2015 Note that this is a utility class for implementing QDM and should not be requested directly as a method in the reV/rex bias correction table input Cannon, A. J., Sobie, S. R. & Murdock, T. Q. Bias Correction of GCM Precipitation by Quantile Mapping: How Well Do Methods Preserve Changes in Quantiles and Extremes? Journal of Climate 28, 6938–6959 (2015). """ def __init__(self, params_oh, params_mh, params_mf, dist='empirical', relative=True, sampling='linear', log_base=10, delta_denom_min=None, delta_denom_zero=None): """ Parameters ---------- params_oh : np.ndarray 2D array of **observed historical** distribution parameters created from a multi-year set of data where the shape is (space, N). This can be the output of a parametric distribution fit like ``scipy.stats.weibull_min.fit()`` where N is the number of parameters for that distribution, or this can define the x-values of N points from an empirical CDF that will be linearly interpolated between. If this is an empirical CDF, this must include the 0th and 100th percentile values and have even percentile spacing between values. params_mh : np.ndarray Same requirements as params_oh. This input arg is for the **modeled historical distribution**. params_mf : np.ndarray | None Same requirements as params_oh. This input arg is for the **modeled future distribution**. If this is None, this defaults to params_mh (no future data, just corrected to modeled historical distribution) dist : str | np.ndarray Probability distribution name to use to model the data which determines how the param args are used. This can "empirical" or any continuous distribution name from ``scipy.stats``. Can also be a 1D array of dist inputs if being used from reV, but they must all be the same option. relative : bool | np.ndarray Flag to preserve relative rather than absolute changes in quantiles. relative=False (default) will multiply by the change in quantiles while relative=True will add. See Equations 4-6 from Cannon et al., 2015 for more details. Can also be a 1D array of dist inputs if being used from reV, but they must all be the same option. sampling : str | np.ndarray If dist="empirical", this is an option for how the quantiles were sampled to produce the params inputs, e.g., how to sample the y-axis of the distribution (see sampling functions in ``rex.utilities.bc_utils``). "linear" will do even spacing, "log" will concentrate samples near quantile=0, and "invlog" will concentrate samples near quantile=1. Can also be a 1D array of dist inputs if being used from reV, but they must all be the same option. log_base : int | float | np.ndarray Log base value if sampling is "log" or "invlog". A higher value will concentrate more samples at the extreme sides of the distribution. Can also be a 1D array of dist inputs if being used from reV, but they must all be the same option. delta_denom_min : float | None Option to specify a minimum value for the denominator term in the calculation of a relative delta value. This prevents division by a very small number making delta blow up and resulting in very large output bias corrected values. See equation 4 of Cannon et al., 2015 for the delta term. delta_denom_zero : float | None Option to specify a value to replace zeros in the denominator term in the calculation of a relative delta value. This prevents division by a very small number making delta blow up and resulting in very large output bias corrected values. See equation 4 of Cannon et al., 2015 for the delta term. """ self.params_oh = params_oh self.params_mh = params_mh self.params_mf = params_mf if params_mf is not None else params_mh self.relative = bool(self._clean_kwarg(relative)) self.dist_name = str(self._clean_kwarg(dist)).casefold() self.sampling = str(self._clean_kwarg(sampling)).casefold() self.log_base = float(self._clean_kwarg(log_base)) self.scipy_dist = None self.delta_denom_min = delta_denom_min self.delta_denom_zero = delta_denom_zero if self.dist_name != 'empirical': self.scipy_dist = getattr(scipy.stats, self.dist_name, None) if self.scipy_dist is None: msg = ('Could not get requested distribution "{}" from ' '``scipy.stats``. Please double check your spelling ' 'and select "empirical" or one of the continuous ' 'distribution options from here: ' 'https://docs.scipy.org/doc/scipy/reference/stats.html' .format(self.dist_name)) logger.error(msg) raise KeyError(msg) @staticmethod def _clean_kwarg(inp): """Clean any kwargs inputs (e.g., dist, relative) that might be provided as an array and must be collapsed into a single string or boolean value""" unique = np.unique(inp) msg = ('_QuantileDeltaMapping kwargs must have only one unique input ' 'even if being called with arrays as part of reV but found: {}' .format(unique)) assert len(unique) == 1, msg while isinstance(inp, np.ndarray): inp = inp[0] return inp @staticmethod def _clean_params(params, arr_shape, scipy_dist): """Verify and clean 2D parameter arrays for passing into empirical distribution or scipy continuous distribution functions. Parameters ---------- params : np.ndarray Input params shape should be (space, N) where N is the number of parameters for the distribution. arr_shape : tuple Array shape should be (time, space). scipy_dist : scipy.stats.rv_continuous | None Any continuous distribution class from ``scipy.stats`` or None if using an empirical distribution (taken from attribute ``QuantileDeltaMapping.scipy_dist``) Returns ------- params : np.ndarray | list If a scipy continuous dist is set, this output will be params unpacked along axis=1 into a list so that the list entries represent the scipy distribution parameters (e.g., shape, scale, loc) and each list entry is of shape (space,) """ msg = f'params must be 2D array but received {type(params)}' assert hasattr(params, 'shape'), msg if len(params.shape) == 1: params = np.expand_dims(params, 0) msg = (f'params must be 2D array of shape ({arr_shape[1]}, N) ' f'but received shape {params.shape}') assert len(params.shape) == 2, msg assert params.shape[0] == arr_shape[1], msg if scipy_dist is not None: params = [params[:, i] for i in range(params.shape[1])] return params @staticmethod def _get_quantiles(n_samples, sampling, log_base): """If dist='empirical', this will get the quantile values for the CDF x-values specified in the input params""" if sampling == 'linear': quantiles = sample_q_linear(n_samples) elif sampling == 'log': quantiles = sample_q_log(n_samples, log_base) elif sampling == 'invlog': quantiles = sample_q_invlog(n_samples, log_base) else: msg = ('sampling option must be linear, log, or invlog, but ' 'received: {}'.format(sampling)) logger.error(msg) raise KeyError(msg) return quantiles
[docs] @classmethod def cdf(cls, x, params, scipy_dist, sampling, log_base): """Run the CDF function e.g., convert physical variable to quantile""" if scipy_dist is None: p = np.zeros_like(x) for idx in range(x.shape[1]): xp = params[idx, :] fp = cls._get_quantiles(len(xp), sampling, log_base) p[:, idx] = np.interp(x[:, idx], xp, fp) else: p = scipy_dist.cdf(x, *params) return p
[docs] @classmethod def ppf(cls, p, params, scipy_dist, sampling, log_base): """Run the inverse CDF function (percent point function) e.g., convert quantile to physical variable""" if scipy_dist is None: x = np.zeros_like(p) for idx in range(p.shape[1]): fp = params[idx, :] xp = cls._get_quantiles(len(fp), sampling, log_base) x[:, idx] = np.interp(p[:, idx], xp, fp) else: x = scipy_dist.ppf(p, *params) return x
[docs] @classmethod def run_qdm(cls, arr, params_oh, params_mh, params_mf, scipy_dist, relative, sampling, log_base, delta_denom_min, delta_denom_zero): """Run the actual QDM operation from args without initializing the ``QuantileDeltaMapping`` object Parameters ---------- arr : np.ndarray 2D array of values in shape (time, space) params_oh : np.ndarray 2D array of **observed historical** distribution parameters created from a multi-year set of data where the shape is (space, N). This can be the output of a parametric distribution fit like ``scipy.stats.weibull_min.fit()`` where N is the number of parameters for that distribution, or this can define the x-values of N points from an empirical CDF that will be linearly interpolated between. If this is an empirical CDF, this must include the 0th and 100th percentile values and have even percentile spacing between values. params_mh : np.ndarray Same requirements as params_oh. This input arg is for the **modeled historical distribution**. params_mf : np.ndarray Same requirements as params_oh. This input arg is for the **modeled future distribution**. scipy_dist : scipy.stats.rv_continuous | None Any continuous distribution class from ``scipy.stats`` or None if using an empirical distribution (taken from attribute ``QuantileDeltaMapping.scipy_dist``) relative : bool | np.ndarray Flag to preserve relative rather than absolute changes in quantiles. relative=False (default) will multiply by the change in quantiles while relative=True will add. See Equations 4-6 from Cannon et al., 2015 for more details. Can also be a 1D array of dist inputs if being used from reV, but they must all be the same option. sampling : str | np.ndarray If dist="empirical", this is an option for how the quantiles were sampled to produce the params inputs, e.g., how to sample the y-axis of the distribution (see sampling functions in ``rex.utilities.bc_utils``). "linear" will do even spacing, "log" will concentrate samples near quantile=0, and "invlog" will concentrate samples near quantile=1. Can also be a 1D array of dist inputs if being used from reV, but they must all be the same option. log_base : int | float | np.ndarray Log base value if sampling is "log" or "invlog". A higher value will concentrate more samples at the extreme sides of the distribution. Can also be a 1D array of dist inputs if being used from reV, but they must all be the same option. delta_denom_min : float | None Option to specify a minimum value for the denominator term in the calculation of a relative delta value. This prevents division by a very small number making delta blow up and resulting in very large output bias corrected values. See equation 4 of Cannon et al., 2015 for the delta term. delta_denom_zero : float | None Option to specify a value to replace zeros in the denominator term in the calculation of a relative delta value. This prevents division by a very small number making delta blow up and resulting in very large output bias corrected values. See equation 4 of Cannon et al., 2015 for the delta term. Returns ------- arr : np.ndarray Bias corrected copy of the input array with same shape. """ params_oh = cls._clean_params(params_oh, arr.shape, scipy_dist) params_mh = cls._clean_params(params_mh, arr.shape, scipy_dist) params_mf = cls._clean_params(params_mf, arr.shape, scipy_dist) # Equation references are from Section 3 of Cannon et al 2015: # Cannon, A. J., Sobie, S. R. & Murdock, T. Q. Bias Correction of GCM # Precipitation by Quantile Mapping: How Well Do Methods Preserve # Changes in Quantiles and Extremes? Journal of Climate 28, 6938–6959 # (2015). logger.debug('Computing CDF on modeled future data') # Eq.3: Tau_m_p = F_m_p(x_m_p) q_mf = cls.cdf(arr, params_mf, scipy_dist, sampling, log_base) logger.debug('Computing PPF on observed historical data') # Eq.5: x^_o:m_h:p = F-1_o_h(Tau_m_p) x_oh = cls.ppf(q_mf, params_oh, scipy_dist, sampling, log_base) logger.debug('Computing PPF on modeled historical data') # Eq.4 denom: F-1_m_h(Tau_m_p) x_mh_mf = cls.ppf(q_mf, params_mh, scipy_dist, sampling, log_base) logger.debug('Finished computing distributions.') if relative: if delta_denom_zero is not None: x_mh_mf[x_mh_mf == 0] = delta_denom_zero elif delta_denom_min is not None: x_mh_mf = np.maximum(x_mh_mf, delta_denom_min) delta = arr / x_mh_mf # Eq.4: x_m_p / F-1_m_h(Tau_m_p) arr_bc = x_oh * delta # Eq.6: x^_m_p = x^_o:m_h:p * delta else: delta = arr - x_mh_mf # Eq.4: x_m_p - F-1_m_h(Tau_m_p) arr_bc = x_oh + delta # Eq.6: x^_m_p = x^_o:m_h:p + delta return arr_bc
[docs] def __call__(self, arr, max_workers=1): """Run the QDM function to bias correct an array Parameters ---------- arr : np.ndarray 2D array of values in shape (time, space) max_workers : int, None Number of parallel workers to use in QDM bias correction. 1 will run in serial (default), None will use all available cores. Returns ------- arr : np.ndarray Bias corrected copy of the input array with same shape. """ if len(arr.shape) == 1: arr = np.expand_dims(arr, 1) if max_workers == 1: arr_bc = self.run_qdm(arr, self.params_oh, self.params_mh, self.params_mf, self.scipy_dist, self.relative, self.sampling, self.log_base, self.delta_denom_min, self.delta_denom_zero) else: max_workers = max_workers or os.cpu_count() sslices = np.array_split(np.arange(arr.shape[1]), arr.shape[1]) sslices = [slice(idx[0], idx[-1] + 1) for idx in sslices] arr_bc = arr.copy() futures = {} with ProcessPoolExecutor(max_workers=max_workers) as exe: for idx in range(arr.shape[1]): idx = slice(idx, idx + 1) fut = exe.submit(self.run_qdm, arr[:, idx], self.params_oh[idx], self.params_mh[idx], self.params_mf[idx], self.scipy_dist, self.relative, self.sampling, self.log_base, self.delta_denom_min, self.delta_denom_zero) futures[fut] = idx for future in as_completed(futures): idx = futures[future] arr_bc[:, idx] = future.result() msg = ('Input shape {} does not match QDM bias corrected output ' 'shape {}!'.format(arr.shape, arr_bc.shape)) assert arr.shape == arr_bc.shape, msg return arr_bc