Source code for farms.utilities

"""Common utilities FARMS module.

from copy import deepcopy
import pandas as pd
import numpy as np
import os
from warnings import warn

[docs]def check_range(data, name, rang=(0, 1)): """Ensure that data values are in correct range.""" if np.nanmin(data) < rang[0] or np.nanmax(data) > rang[1]: raise ValueError('Variable "{n}" is out of expected ' 'transmittance/reflectance range. Recommend checking ' 'solar zenith angle to ensure cos(sza) is ' 'non-negative and non-zero. ' 'Max/min of {n} = {mx}/{mn}' .format(n=name, mx=np.nanmax(data), mn=np.nanmin(data)))
[docs]def ti_to_radius_csv(time_index, n_cols=1): """Convert a time index to radius. Parameters ---------- time_index : pandas.core.indexes.datetimes.DatetimeIndex NSRDB time series. Can extract from h5 as follows: time_index = pd.to_datetime(h5['time_index'][...].astype(str)) n_cols : int Number of columns to output. The radius vertical 1D array will be copied this number of times horizontally (np.tile). Returns ------- radius : np.array Array of radius values matching the time index. Shape is (len(time_index), n_cols). """ doy = pd.DataFrame(index=time_index.dayofyear) radius = doy.join(RADIUS) radius = np.tile(radius.values, n_cols) return radius
[docs]def ti_to_radius(time_index, n_cols=1): """Calculates Earth-Sun Radius Vector. Reference: Parameters ---------- time_index : pandas.core.indexes.datetimes.DatetimeIndex NSRDB time series. Can extract from h5 as follows: time_index = pd.to_datetime(h5['time_index'][...].astype(str)) n_cols : int Number of columns to output. The radius vertical 1D array will be copied this number of times horizontally (np.tile). Returns ------- radius : np.array Array of radius values matching the time index. Shape is (len(time_index), n_cols). """ # load earth periodic table path = os.path.dirname(os.path.realpath(__file__)) df = pd.read_csv(os.path.join(path, 'earth_periodic_terms.csv')) df['key'] = 1 # 3.1.1 (4). Julian Date. j = time_index.to_julian_date().values # 3.1.2 (5). Julian Ephermeris Date j = j + 64.797 / 86400 # 3.1.3 (7). Julian Century Ephemeris j = (j - 2451545) / 36525 # 3.1.4 (8). Julian Ephemeris Millennium j = j / 10 df_jme = pd.DataFrame({'uid': range(len(j)), 'jme': j, 'key': 1}) # Merge JME with Periodic Table df_merge = pd.merge(df_jme, df, on='key') # 3.2.1 (9). Heliocentric radius vector. df_merge['r'] = df_merge['a'] * np.cos(df_merge['b'] + df_merge['c'] * df_merge['jme']) # 3.2.2 (10). dfs = df_merge.groupby(by=['uid', 'term'])['r'].sum().unstack() # 3.2.4 (11). Earth Heliocentric radius vector radius = ((dfs['R0'] + dfs['R1'] * j + dfs['R2'] * np.power(j, 2) + dfs['R3'] * np.power(j, 3) + dfs['R4'] * np.power(j, 4) + dfs['R5'] * np.power(j, 5)) / np.power(10, 8)).values radius = radius.reshape((len(time_index), 1)) radius = np.tile(radius, n_cols) return radius
[docs]def calc_beta(aod, alpha): """Calculate the Angstrom turbidity coeff. (beta). Parameters ---------- aod : np.ndarray Array of aerosol optical depth (AOD) values. Shape must match alpha. alpha : np.ndarray Array of angstrom wavelength exponent values. Shape must match aod. Returns ------- beta : np.ndarray Array of Angstrom turbidity coeff., i.e. AOD at 1000 nm. Shape will be same as aod and alpha. Will be tested for compliance with the mandatory interval [0, 2.2]. """ if aod.shape != alpha.shape: raise ValueError('To calculate beta, aod and alpha inputs must be of ' 'the same shape. Received arrays of shape {} and {}' .format(aod.shape, alpha.shape)) beta = aod * np.power(0.55, alpha) if np.max(beta) > 2.2 or np.min(beta) < 0: warn('Calculation of beta resulted in values outside of ' 'expected range [0, 2.2]. Min/max of beta are: {}/{}' .format(np.min(beta), np.max(beta))) return beta
[docs]def calc_dhi(dni, ghi, sza): """Calculate the diffuse horizontal irradiance and correct the direct. Parameters ---------- dni : np.ndarray Direct normal irradiance. ghi : np.ndarray Global horizontal irradiance. sza : np.ndarray Solar zenith angle (degrees). Returns ------- dhi : np.ndarray Diffuse horizontal irradiance. This is ensured to be non-negative. dni : np.ndarray Direct normal irradiance. This is set to zero where dhi < 0 """ dhi = ghi - dni * np.cos(np.radians(sza)) if np.min(dhi) < 0: # patch for negative DHI values. Set DNI to zero, set DHI to GHI pos = np.where(dhi < 0) dni[pos] = 0 dhi[pos] = ghi[pos] return dhi, dni
[docs]def rayleigh(dhi, cs_dhi, fill_flag, rayleigh_flag=7): """Perform Rayleigh violation check (all-sky diffuse >= clearsky diffuse). Decided not to use this in all-sky on 7/3/2019 Failed data gets filled with farms data Parameters ---------- dhi : np.ndarray All-sky diffuse irradiance. cs_dhi : np.ndarray Clearsky (rest) diffuse irradiance. fill_flag : np.ndarray Array of integers signifying whether irradiance has been filled. rayleigh_flag : int Fill flag for rayleigh violation. Returns ------- fill_flag : np.ndarray Array of integers signifying whether irradiance has been filled, with rayleigh violations marked with the rayleigh flag. """ # boolean mask where the rayleigh check fails # (compare agains 99.9% to avoid false positives) failed = (dhi < (0.999 * deepcopy(cs_dhi))) & (cs_dhi > 0) fill_flag[failed] = rayleigh_flag return fill_flag
[docs]def merge_rest_farms(clearsky_irrad, cloudy_irrad, cloud_type): """Combine clearsky and rest data into all-sky irradiance array. Parameters ---------- clearsky_irrad : np.ndarray Clearsky irradiance data from REST. cloudy_irrad : np.ndarray Cloudy irradiance data from FARMS. cloud_type : np.ndarray Cloud type array which acts as a mask specifying where to take cloud/clear data. Returns ------- all_sky_irrad : np.ndarray All-sky (cloudy + clear) irradiance data, merged dataset from FARMS and REST. """ # disable nan warnings np.seterr(divide='ignore', invalid='ignore') # combine clearsky and farms according to the cloud types. all_sky_irrad = np.where(np.isin(cloud_type, CLEAR_TYPES), clearsky_irrad, cloudy_irrad) return all_sky_irrad
[docs]def screen_cld(cld_data, rng=(0, 160)): """Enforce a numeric range on cloud property data. Parameters ---------- cld_data : np.ndarray Cloud property data (cld_opd_dcomp, cld_reff_dcomp). rng : list | tuple Inclusive intended range of the cloud data. Parameters ---------- cld_data : np.ndarray Cloud property data (cld_opd_dcomp, cld_reff_dcomp) with min/max values equal to rng. """ cld_data[np.isnan(cld_data)] = 0 cld_data[(cld_data < rng[0])] = rng[0] cld_data[(cld_data > rng[1])] = rng[1] return cld_data
[docs]def screen_sza(sza, lim=SZA_LIM): """Enforce an upper limit on the solar zenith angle. Parameters ---------- sza : np.ndarray Solar zenith angle in degrees. lim : int | float Upper limit of SZA in degrees. Returns ---------- sza : np.ndarray Solar zenith angle in degrees with max value = lim. """ sza[(sza > lim)] = lim return sza
[docs]def dark_night(irrad_data, sza, lim=SZA_LIM): """Enforce zero irradiance when solar angle >= threshold. Parameters ---------- irrad_data : np.ndarray DHI, DNI, or GHI. sza : np.ndarray Solar zenith angle in degrees. lim : int | float Upper limit of SZA in degrees. Returns ------- irrad_data : np.ndarray DHI, DNI, or GHI with zero values when sza >= lim. """ night_mask = np.where(sza >= lim) irrad_data[night_mask] = 0 return irrad_data
[docs]def cloud_variability(irrad, cs_irrad, cloud_type, var_frac=0.05, distribution='uniform', option='tri', tri_center=0.9, random_seed=123): """Add syntehtic variability to irradiance when it's cloudy. Parameters ---------- irrad : np.ndarray Full FARMS + REST2 merged irradiance 2D array. cs_irrad : np.ndarray REST2 clearsky irradiance without bad or missing data. cloud_type : np.ndarray Array of numerical cloud types. var_frac : float Maximum variability fraction (0.05 is 5% variability) or if distribution is "normal" this is the maximum relative standard deviation of the Variability. distribution : str Distribution shape of the Variability. Can be uniform or normal. option : str Variability function option ('tri' or 'linear'). random_seed : int | NoneType Number to seed the numpy random number generator. Used to generate reproducable psuedo-random cloud variability. Numpy random will be seeded with the system time if this is None. Returns ------- irrad : np.ndarray Full FARMS + REST2 merged irradiance 2D array with variability added to cloudy timesteps. """ # disable divide by zero warnings np.seterr(divide='ignore', invalid='ignore') if var_frac: # set a seed for psuedo-random but repeatable results np.random.seed(seed=random_seed) # update the clearsky ratio (1 is clear, 0 is cloudy or dark) csr = irrad / cs_irrad # Set the cloud/clear ratio to zero when it's nighttime csr[(cs_irrad == 0)] = 0 if distribution == 'uniform': variability_scalar = uniform_variability(csr, cloud_type, var_frac, option=option, tri_center=tri_center) elif distribution == 'normal': variability_scalar = normal_variability(csr, cloud_type, var_frac, option=option, tri_center=tri_center) else: raise ValueError('Did not recognize distribution: {}' .format(distribution)) irrad *= variability_scalar return irrad
[docs]def uniform_variability(csr, cloud_type, var_frac, option='tri', tri_center=0.9): """Get an array with uniform variability scalars centered at 1 that can be multiplied by a irradiance array with the same shape as csr. Parameters ---------- csr : np.ndarray REST2 clearsky irradiance without bad or missing data. This is a 2D array with (time, sites). cloud_type : np.ndarray Array of numerical cloud types. var_frac : float Maximum variability fraction (0.05 is 5% variability). option : str Variability function option ('tri' or 'linear'). tri_center : float Value of the clearsky ratio at which there is maximum variability (only used for the triangular distribution). Returns ------- variability_scalar : np.ndarray Array with shape matching csr with uniform random numbers centered at 1 with range (1 - var_frac) to (1 + var_frac). This array can be multiplied by an irradiance array with the same shape as csr """ if option == 'linear': var_frac_arr = linear_variability(csr, var_frac) elif option == 'tri': var_frac_arr = tri_variability(csr, var_frac, tri_center=tri_center) else: raise ValueError('Did not recognize variability option: {}' .format(option)) # get a uniform random scalar array 0 to 1 with data shape rand_arr = np.random.rand(csr.shape[0], csr.shape[1]) # Center the random array at 1 +/- var_frac_arr (with csr scaling) variability_scalar = 1 + var_frac_arr * (rand_arr * 2 - 1) # only apply rand to the applicable cloudy timesteps variability_scalar = np.where(np.isin(cloud_type, CLOUD_TYPES), variability_scalar, 1) return variability_scalar
[docs]def normal_variability(csr, cloud_type, var_frac, option='tri', tri_center=0.9): """Get an array with a normal distribution of variability scalars centered at 1 that can be multiplied by a irradiance array with the same shape as csr. Parameters ---------- csr : np.ndarray REST2 clearsky irradiance without bad or missing data. This is a 2D array with (time, sites). cloud_type : np.ndarray Array of numerical cloud types. var_frac : float One relative standard deviation variability (0.05 is a relative standard deviation of 5% variability). option : str Variability function option ('tri' or 'linear'). tri_center : float Value of the clearsky ratio at which there is maximum variability (only used for the triangular distribution). Returns ------- variability_scalar : np.ndarray Array with shape matching csr with normally distributed random numbers centered at 1 with range (1 - var_frac) to (1 + var_frac). This array can be multiplied by an irradiance array with the same shape as csr """ if option == 'linear': var_frac_arr = linear_variability(csr, var_frac) elif option == 'tri': var_frac_arr = tri_variability(csr, var_frac, tri_center=tri_center) else: raise ValueError('Did not recognize variability option: {}' .format(option)) # get a normal distribution of data centered at 0 with stdev 1 rand_arr = np.random.normal(loc=0.0, scale=1.0, size=csr.shape) # Center the random array at 1 +/- var_frac_arr (with csr scaling) variability_scalar = 1 + var_frac_arr * rand_arr # only apply rand to the applicable cloudy timesteps variability_scalar = np.where(np.isin(cloud_type, CLOUD_TYPES), variability_scalar, 1) return variability_scalar
[docs]def linear_variability(csr, var_frac): """Return an array with a linear relation between clearsky ratio and maximum variability fraction. Each value in the array is the maximum variability fraction for the corresponding clearsky ratio. Parameters ---------- csr : np.ndarray REST2 clearsky irradiance without bad or missing data. This is a 2D array with (time, sites). var_frac : float Maximum variability fraction (0.05 is 5% variability). Returns ------- out : np.ndarray Array with shape matching csr with maximum variability (var_frac) when the csr = 1 (clear or thin clouds). Each value in the array is the maximum variability fraction for the corresponding clearsky ratio. """ return var_frac * csr
[docs]def tri_variability(csr, var_frac, tri_center=0.9): """Return an array with a triangular distribution between clearsky ratio and maximum variability fraction. Each value in the array is the maximum variability fraction for the corresponding clearsky ratio. The max variability occurs when csr==tri_center, and zero variability when csr==0 or csr==1 Parameters ---------- csr : np.ndarray REST2 clearsky irradiance without bad or missing data. This is a 2D array with (time, sites). var_frac : float Maximum variability fraction (0.05 is 5% variability). tri_center : float Value of the clearsky ratio at which there is maximum variability. Returns ------- tri : np.ndarray Array with shape matching csr with maximum variability (var_frac) when the csr==tri_center. Each value in the array is the maximum variability fraction for the corresponding clearsky ratio. """ tri_left = var_frac * csr * 1.11111 slope = -1 / (1 - tri_center) yint = tri_center * 10 + 1 tri_right = var_frac * (slope * csr + yint) tri = np.where(csr < tri_center, tri_left, tri_right) return tri