"""Utilities to calculate the bias correction factors for biased data that is
going to be fed into the sup3r downscaling models. This is typically used to
bias correct GCM data vs. some historical record like the WTK or NSRDB.
TODO: Generalize the ``with ProcessPoolExecutor() as exe: ...`` so we don't
need to duplicate this wherever we kickoff a process or thread pool
"""
import copy
import json
import logging
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
import h5py
import numpy as np
from scipy import stats
from sup3r.preprocessing import DataHandler
from .base import DataRetrievalBase
from .mixins import FillAndSmoothMixin
logger = logging.getLogger(__name__)
[docs]
class LinearCorrection(FillAndSmoothMixin, DataRetrievalBase):
"""Calculate linear correction *scalar +adder factors to bias correct data
This calculation operates on single bias sites for the full time series of
available data (no season bias correction)
"""
NT = 1
"""size of the time dimension, 1 is no time-based bias correction"""
def _init_out(self):
"""Initialize output arrays"""
keys = [
f'{self.bias_feature}_scalar',
f'{self.bias_feature}_adder',
f'bias_{self.bias_feature}_mean',
f'bias_{self.bias_feature}_std',
f'base_{self.base_dset}_mean',
f'base_{self.base_dset}_std',
]
self.out = {
k: np.full(
(*self.bias_gid_raster.shape, self.NT), np.nan, np.float32
)
for k in keys
}
[docs]
@staticmethod
def get_linear_correction(bias_data, base_data, bias_feature, base_dset):
"""Get the linear correction factors based on 1D bias and base datasets
Parameters
----------
bias_data : np.ndarray
1D array of biased data observations.
base_data : np.ndarray
1D array of base data observations.
bias_feature : str
This is the biased feature from bias_fps to retrieve. This should
be a single feature name corresponding to base_dset
base_dset : str
A single dataset from the base_fps to retrieve. In the case of wind
components, this can be u_100m or v_100m which will retrieve
windspeed and winddirection and derive the U/V component.
Returns
-------
out : dict
Dictionary of values defining the mean/std of the bias + base
data and the scalar + adder factors to correct the biased data
like: bias_data * scalar + adder
"""
bias_std = np.nanstd(bias_data)
if bias_std == 0:
bias_std = np.nanstd(base_data)
scalar = np.nanstd(base_data) / bias_std
adder = np.nanmean(base_data) - np.nanmean(bias_data) * scalar
out = {
f'bias_{bias_feature}_mean': np.nanmean(bias_data),
f'bias_{bias_feature}_std': bias_std,
f'base_{base_dset}_mean': np.nanmean(base_data),
f'base_{base_dset}_std': np.nanstd(base_data),
f'{bias_feature}_scalar': scalar,
f'{bias_feature}_adder': adder,
}
return out
# pylint: disable=W0613
@classmethod
def _run_single(
cls,
bias_data,
base_fps,
bias_feature,
base_dset,
base_gid,
base_handler,
daily_reduction,
bias_ti, # noqa: ARG003
decimals,
base_dh_inst=None,
match_zero_rate=False,
):
"""Find the nominal scalar + adder combination to bias correct data
at a single site"""
base_data, _ = cls.get_base_data(
base_fps,
base_dset,
base_gid,
base_handler,
daily_reduction=daily_reduction,
decimals=decimals,
base_dh_inst=base_dh_inst,
)
if match_zero_rate:
bias_data = cls._match_zero_rate(bias_data, base_data)
out = cls.get_linear_correction(
bias_data, base_data, bias_feature, base_dset
)
return out
[docs]
def write_outputs(self, fp_out, out):
"""Write outputs to an .h5 file.
Parameters
----------
fp_out : str | None
Optional .h5 output file to write scalar and adder arrays.
out : dict
Dictionary of values defining the mean/std of the bias + base
data and the scalar + adder factors to correct the biased data
like: bias_data * scalar + adder. Each value is of shape
(lat, lon, time).
"""
if fp_out is not None:
if not os.path.exists(os.path.dirname(fp_out)):
os.makedirs(os.path.dirname(fp_out), exist_ok=True)
with h5py.File(fp_out, 'w') as f:
# pylint: disable=E1136
lat = self.bias_dh.lat_lon[..., 0]
lon = self.bias_dh.lat_lon[..., 1]
f.create_dataset('latitude', data=lat)
f.create_dataset('longitude', data=lon)
for dset, data in out.items():
f.create_dataset(dset, data=data)
for k, v in self.meta.items():
f.attrs[k] = json.dumps(v)
logger.info(
'Wrote scalar adder factors to file: {}'.format(fp_out)
)
[docs]
def run(
self,
fp_out=None,
max_workers=None,
daily_reduction='avg',
fill_extend=True,
smooth_extend=0,
smooth_interior=0,
):
"""Run linear correction factor calculations for every site in the bias
dataset
Parameters
----------
fp_out : str | None
Optional .h5 output file to write scalar and adder arrays.
max_workers : int
Number of workers to run in parallel. 1 is serial and None is all
available.
daily_reduction : None | str
Option to do a reduction of the hourly+ source base data to daily
data. Can be None (no reduction, keep source time frequency), "avg"
(daily average), "max" (daily max), "min" (daily min),
"sum" (daily sum/total)
fill_extend : bool
Flag to fill data past distance_upper_bound using spatial nearest
neighbor. If False, the extended domain will be left as NaN.
smooth_extend : float
Option to smooth the scalar/adder data outside of the spatial
domain set by the distance_upper_bound input. This alleviates the
weird seams far from the domain of interest. This value is the
standard deviation for the gaussian_filter kernel
smooth_interior : float
Option to smooth the scalar/adder data within the valid spatial
domain. This can reduce the affect of extreme values within
aggregations over large number of pixels.
Returns
-------
out : dict
Dictionary of values defining the mean/std of the bias + base
data and the scalar + adder factors to correct the biased data
like: bias_data * scalar + adder. Each value is of shape
(lat, lon, time).
"""
logger.debug('Starting linear correction calculation...')
logger.info(
'Initialized scalar / adder with shape: {}'.format(
self.bias_gid_raster.shape
)
)
self.bad_bias_gids = []
# sup3r DataHandler opening base files will load all data in parallel
# during the init and should not be passed in parallel to workers
if isinstance(self.base_dh, DataHandler):
max_workers = 1
if max_workers == 1:
logger.debug('Running serial calculation.')
for i, bias_gid in enumerate(self.bias_meta.index):
raster_loc = np.where(self.bias_gid_raster == bias_gid)
_, base_gid = self.get_base_gid(bias_gid)
if not base_gid.any():
self.bad_bias_gids.append(bias_gid)
else:
bias_data = self.get_bias_data(bias_gid)
single_out = self._run_single(
bias_data,
self.base_fps,
self.bias_feature,
self.base_dset,
base_gid,
self.base_handler,
daily_reduction,
self.bias_ti,
self.decimals,
base_dh_inst=self.base_dh,
match_zero_rate=self.match_zero_rate,
)
for key, arr in single_out.items():
self.out[key][raster_loc] = arr
logger.info(
'Completed bias calculations for {} out of {} '
'sites'.format(i + 1, len(self.bias_meta))
)
else:
logger.debug(
'Running parallel calculation with {} workers.'.format(
max_workers
)
)
with ProcessPoolExecutor(max_workers=max_workers) as exe:
futures = {}
for bias_gid in self.bias_meta.index:
raster_loc = np.where(self.bias_gid_raster == bias_gid)
_, base_gid = self.get_base_gid(bias_gid)
if not base_gid.any():
self.bad_bias_gids.append(bias_gid)
else:
bias_data = self.get_bias_data(bias_gid)
future = exe.submit(
self._run_single,
bias_data,
self.base_fps,
self.bias_feature,
self.base_dset,
base_gid,
self.base_handler,
daily_reduction,
self.bias_ti,
self.decimals,
match_zero_rate=self.match_zero_rate,
)
futures[future] = raster_loc
logger.debug('Finished launching futures.')
for i, future in enumerate(as_completed(futures)):
raster_loc = futures[future]
single_out = future.result()
for key, arr in single_out.items():
self.out[key][raster_loc] = arr
logger.info(
'Completed bias calculations for {} out of {} '
'sites'.format(i + 1, len(futures))
)
logger.info('Finished calculating bias correction factors.')
self.out = self.fill_and_smooth(
self.out, fill_extend, smooth_extend, smooth_interior
)
self.write_outputs(fp_out, self.out)
return copy.deepcopy(self.out)
[docs]
class ScalarCorrection(LinearCorrection):
"""Calculate annual linear correction *scalar factors to bias correct data.
This typically used when base data is just monthly or annual means and
standard deviations cannot be computed. This is case for vortex data, for
example. Thus, just scalar factors are computed as mean(base_data) /
mean(bias_data). Adder factors are still written but are exactly zero.
This calculation operates on single bias sites on a monthly basis
"""
[docs]
@staticmethod
def get_linear_correction(bias_data, base_data, bias_feature, base_dset):
"""Get the linear correction factors based on 1D bias and base datasets
Parameters
----------
bias_data : np.ndarray
1D array of biased data observations.
base_data : np.ndarray
1D array of base data observations.
bias_feature : str
This is the biased feature from bias_fps to retrieve. This should
be a single feature name corresponding to base_dset
base_dset : str
A single dataset from the base_fps to retrieve. In the case of wind
components, this can be u_100m or v_100m which will retrieve
windspeed and winddirection and derive the U/V component.
Returns
-------
out : dict
Dictionary of values defining the mean/std of the bias + base
data and the scalar + adder factors to correct the biased data
like: bias_data * scalar + adder
"""
bias_std = np.nanstd(bias_data)
if bias_std == 0:
bias_std = np.nanstd(base_data)
base_mean = np.nanmean(base_data)
bias_mean = np.nanmean(bias_data)
scalar = base_mean / bias_mean
adder = np.zeros(scalar.shape)
out = {
f'bias_{bias_feature}_mean': bias_mean,
f'base_{base_dset}_mean': base_mean,
f'{bias_feature}_scalar': scalar,
f'{bias_feature}_adder': adder,
}
return out
[docs]
class MonthlyLinearCorrection(LinearCorrection):
"""Calculate linear correction *scalar +adder factors to bias correct data
This calculation operates on single bias sites on a monthly basis
"""
NT = 12
"""size of the time dimension, 12 is monthly bias correction"""
@classmethod
def _run_single(
cls,
bias_data,
base_fps,
bias_feature,
base_dset,
base_gid,
base_handler,
daily_reduction,
bias_ti,
decimals,
base_dh_inst=None,
match_zero_rate=False,
):
"""Find the nominal scalar + adder combination to bias correct data
at a single site"""
base_data, base_ti = cls.get_base_data(
base_fps,
base_dset,
base_gid,
base_handler,
daily_reduction=daily_reduction,
decimals=decimals,
base_dh_inst=base_dh_inst,
)
if match_zero_rate:
bias_data = cls._match_zero_rate(bias_data, base_data)
base_arr = np.full(cls.NT, np.nan, dtype=np.float32)
out = {}
for month in range(1, 13):
bias_mask = bias_ti.month == month
base_mask = base_ti.month == month
if any(bias_mask) and any(base_mask):
mout = cls.get_linear_correction(
bias_data[bias_mask],
base_data[base_mask],
bias_feature,
base_dset,
)
for k, v in mout.items():
if k not in out:
out[k] = base_arr.copy()
out[k][month - 1] = v
return out
[docs]
class MonthlyScalarCorrection(MonthlyLinearCorrection, ScalarCorrection):
"""Calculate linear correction *scalar factors for each month"""
NT = 12
[docs]
class SkillAssessment(MonthlyLinearCorrection):
"""Calculate historical skill of one dataset compared to another."""
PERCENTILES = (1, 5, 25, 50, 75, 95, 99)
"""Data percentiles to report."""
def _init_out(self):
"""Initialize output arrays"""
monthly_keys = [
f'{self.bias_feature}_scalar',
f'{self.bias_feature}_adder',
f'bias_{self.bias_feature}_mean_monthly',
f'bias_{self.bias_feature}_std_monthly',
f'base_{self.base_dset}_mean_monthly',
f'base_{self.base_dset}_std_monthly',
]
annual_keys = [
f'{self.bias_feature}_ks_stat',
f'{self.bias_feature}_ks_p',
f'{self.bias_feature}_bias',
f'bias_{self.bias_feature}_mean',
f'bias_{self.bias_feature}_std',
f'bias_{self.bias_feature}_skew',
f'bias_{self.bias_feature}_kurtosis',
f'bias_{self.bias_feature}_zero_rate',
f'base_{self.base_dset}_mean',
f'base_{self.base_dset}_std',
f'base_{self.base_dset}_skew',
f'base_{self.base_dset}_kurtosis',
f'base_{self.base_dset}_zero_rate',
]
self.out = {
k: np.full(
(*self.bias_gid_raster.shape, self.NT), np.nan, np.float32
)
for k in monthly_keys
}
arr = np.full((*self.bias_gid_raster.shape, 1), np.nan, np.float32)
for k in annual_keys:
self.out[k] = arr.copy()
for p in self.PERCENTILES:
base_k = f'base_{self.base_dset}_percentile_{p}'
bias_k = f'bias_{self.bias_feature}_percentile_{p}'
self.out[base_k] = arr.copy()
self.out[bias_k] = arr.copy()
@classmethod
def _run_skill_eval(
cls,
bias_data,
base_data,
bias_feature,
base_dset,
match_zero_rate=False,
):
"""Run skill assessment metrics on 1D datasets at a single site.
Note we run the KS test on the mean=0 distributions as per:
S. Brands et al. 2013 https://doi.org/10.1007/s00382-013-1742-8
"""
if match_zero_rate:
bias_data = cls._match_zero_rate(bias_data, base_data)
out = {}
bias_mean = np.nanmean(bias_data)
base_mean = np.nanmean(base_data)
out[f'{bias_feature}_bias'] = bias_mean - base_mean
out[f'bias_{bias_feature}_mean'] = bias_mean
out[f'bias_{bias_feature}_std'] = np.nanstd(bias_data)
out[f'bias_{bias_feature}_skew'] = stats.skew(bias_data)
out[f'bias_{bias_feature}_kurtosis'] = stats.kurtosis(bias_data)
out[f'bias_{bias_feature}_zero_rate'] = np.nanmean(bias_data == 0)
out[f'base_{base_dset}_mean'] = base_mean
out[f'base_{base_dset}_std'] = np.nanstd(base_data)
out[f'base_{base_dset}_skew'] = stats.skew(base_data)
out[f'base_{base_dset}_kurtosis'] = stats.kurtosis(base_data)
out[f'base_{base_dset}_zero_rate'] = np.nanmean(base_data == 0)
if match_zero_rate:
ks_out = stats.ks_2samp(base_data, bias_data)
else:
ks_out = stats.ks_2samp(
base_data - base_mean, bias_data - bias_mean
)
out[f'{bias_feature}_ks_stat'] = ks_out.statistic
out[f'{bias_feature}_ks_p'] = ks_out.pvalue
for p in cls.PERCENTILES:
base_k = f'base_{base_dset}_percentile_{p}'
bias_k = f'bias_{bias_feature}_percentile_{p}'
out[base_k] = np.percentile(base_data, p)
out[bias_k] = np.percentile(bias_data, p)
return out
@classmethod
def _run_single(
cls,
bias_data,
base_fps,
bias_feature,
base_dset,
base_gid,
base_handler,
daily_reduction,
bias_ti,
decimals,
base_dh_inst=None,
match_zero_rate=False,
):
"""Do a skill assessment at a single site"""
base_data, base_ti = cls.get_base_data(
base_fps,
base_dset,
base_gid,
base_handler,
daily_reduction=daily_reduction,
decimals=decimals,
base_dh_inst=base_dh_inst,
)
arr = np.full(cls.NT, np.nan, dtype=np.float32)
out = {
f'bias_{bias_feature}_mean_monthly': arr.copy(),
f'bias_{bias_feature}_std_monthly': arr.copy(),
f'base_{base_dset}_mean_monthly': arr.copy(),
f'base_{base_dset}_std_monthly': arr.copy(),
}
out.update(
cls._run_skill_eval(
bias_data,
base_data,
bias_feature,
base_dset,
match_zero_rate=match_zero_rate,
)
)
for month in range(1, 13):
bias_mask = bias_ti.month == month
base_mask = base_ti.month == month
if any(bias_mask) and any(base_mask):
mout = cls.get_linear_correction(
bias_data[bias_mask],
base_data[base_mask],
bias_feature,
base_dset,
)
for k, v in mout.items():
if not k.endswith(('_scalar', '_adder')):
k += '_monthly'
out[k][month - 1] = v
return out