# -*- coding: utf-8 -*-
General Joint Probabilty Distribution calculator
from concurrent.futures import as_completed
import gc
import logging
import h5py
import numpy as np
import os
import pandas as pd
from warnings import warn

from rex.version import __version__
from rex.renewable_resource import WindResource
from rex.resource import Resource
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.loggers import log_mem, log_versions
from rex.utilities.utilities import slice_sites, to_records_array

logger = logging.getLogger(__name__)

[docs] class JointPD: """ Compute the joint probability distribution between the desired variables """ def __init__(self, res_h5, res_cls=Resource, hsds=False): """ Parameters ---------- res_h5 : str Path to resource h5 file(s) res_cls : Class, optional Resource handler class to use to access res_h5, by default Resource hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS, by default False """ log_versions(logger) self._res_h5 = res_h5 self._res_cls = res_cls self._hsds = hsds @property def res_h5(self): """ Path to resource h5 file(s) Returns ------- str """ return self._res_h5 @property def res_cls(self): """ Resource class to use to access wind_h5 Returns ------- Class """ return self._res_cls @staticmethod def _make_bins(start, stop, step): """ Create bin edges from bin range Parameters ---------- start : int bin range start value stop : int bin range stop value step : int bin range step value Returns ------- bin_edges : ndarray Vector of inclusive bin edges """ bin_edges = np.arange(start, stop + step, step) return bin_edges
[docs] @classmethod def compute_joint_pd(cls, res_h5, dset1, dset2, bins1, bins2, res_cls=Resource, hsds=False, sites_slice=None): """ Compute the joint probability distribution between the two given datasets using the given bins for given sites Parameters ---------- res_h5 : str Path to resource h5 file(s) dset1 : str Dataset 1 to generate joint probability distribution for dset2 : str Dataset 2 to generate joint probabilty distribution for bins1 : tuple (start, stop, step) for dataset 1 bins. The stop value is inclusive, so (0, 6, 2) would yield three bins with edges (0, 2, 4, 6). If the stop value is not perfectly divisible by the step, the last bin will overshoot the stop value. bins2 : tuple (start, stop, step) for dataset 2 bins. The stop value is inclusive, so (0, 6, 2) would yield three bins with edges (0, 2, 4, 6). If the stop value is not perfectly divisible by the step, the last bin will overshoot the stop value. res_cls : Class, optional Resource handler class to use to access res_h5, by default Resource hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS, by default False sites_slice : slice | None, optional Sites to extract, if None all, by default None (sites is synonymous with gids aka spatial indices) Returns ------- jpd : dict Dictionary of joint probabilty distribution densities for given sites """ if sites_slice is None: sites_slice = slice(None, None, None) elif isinstance(sites_slice, int): sites_slice = [sites_slice] with res_cls(res_h5, hsds=hsds) as f: arr1 = f[dset1, :, sites_slice] arr2 = f[dset2, :, sites_slice] bins1 = cls._make_bins(*bins1) bins2 = cls._make_bins(*bins2) if isinstance(sites_slice, slice) and sites_slice.stop: gids = list(range(*sites_slice.indices(sites_slice.stop))) elif isinstance(sites_slice, (list, np.ndarray)): gids = sites_slice jpd = {} for i, (a1, a2) in enumerate(zip(arr1.T, arr2.T)): jpd[gids[i]] = np.histogram2d(a1, a2, bins=(bins1, bins2), density=True)[0].astype(np.float32) return jpd
def _get_slices(self, dset1, dset2, sites=None, chunks_per_slice=5): """ Get slices to extract, ensure the shapes of dset1 and 2 match. Parameters ---------- dset1 : str Dataset 1 to generate joint probability distribution for dset2 : str Dataset 2 to generate joint probabilty distribution for sites : list | slice, optional Subset of sites to extract, by default None or all sites (sites is synonymous with gids aka spatial indices) chunks_per_slice : int, optional Number of chunks to extract in each slice, by default 5 Returns ------- slices : list List of slices to extract """ with self.res_cls(self.res_h5) as f: shape, _, chunks = f.get_dset_properties(dset1) shape2, _, _ = f.get_dset_properties(dset2) if shape != shape2: msg = ("The shape of {}: {}, does not match the shape of {}: {}!" .format(dset1, shape, dset2, shape2)) logger.error(msg) raise RuntimeError(msg) slices = slice_sites(shape, chunks, sites=sites, chunks_per_slice=chunks_per_slice) return slices
[docs] def compute(self, dset1, dset2, bins1, bins2, sites=None, max_workers=None, chunks_per_worker=5): """ Compute joint probability distribution between given datasets using given bins for all sites. Parameters ---------- dset1 : str Dataset 1 to generate joint probability distribution for dset2 : str Dataset 2 to generate joint probabilty distribution for bins1 : tuple (start, stop, step) for dataset 1 bins. The stop value is inclusive, so (0, 6, 2) would yield three bins with edges (0, 2, 4, 6). If the stop value is not perfectly divisible by the step, the last bin will overshoot the stop value. bins2 : tuple (start, stop, step) for dataset 2 bins. The stop value is inclusive, so (0, 6, 2) would yield three bins with edges (0, 2, 4, 6). If the stop value is not perfectly divisible by the step, the last bin will overshoot the stop value. sites : list | slice, optional Subset of sites to extract, by default None or all sites (sites is synonymous with gids aka spatial indices) max_workers : None | int, optional Number of workers to use, if 1 run in serial, if None use all available cores, by default None chunks_per_worker : int, optional Number of chunks to extract on each worker, by default 5 Returns ------- jpd: pandas.DataFrame DataFrame of joint probability distribution between given datasets with given bins """ if max_workers is None: max_workers = os.cpu_count() slices = self._get_slices(dset1, dset2, sites, chunks_per_slice=chunks_per_worker) if len(slices) == 1: max_workers = 1 jpd = {} if max_workers > 1: msg = ('Computing the joint probability distribution between {} ' 'and {} in parallel using {} workers' .format(dset1, dset2, max_workers)) loggers = [__name__, 'rex'] with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: futures = [] for sites_slice in slices: future = exe.submit(self.compute_joint_pd, self.res_h5, dset1, dset2, bins1, bins2, res_cls=self.res_cls, hsds=self._hsds, sites_slice=sites_slice) futures.append(future) for i, future in enumerate(as_completed(futures)): jpd.update(future.result()) logger.debug('Completed {} out of {} workers' .format((i + 1), len(futures))) else: msg = ('Computing the joint probability distribution between {} ' 'and {} in serial.' .format(dset1, dset2)) for i, sites_slice in enumerate(slices): jpd.update(self.compute_joint_pd( self.res_h5, dset1, dset2, bins1, bins2, res_cls=self.res_cls, hsds=self._hsds, sites_slice=sites_slice)) logger.debug('Completed {} out of {} sets of sites' .format((i + 1), len(slices))) gc.collect() log_mem(logger) bins1 = self._make_bins(*bins1) bins2 = self._make_bins(*bins2) index = np.meshgrid(bins1[:-1], bins2[:-1], indexing='ij') index = np.array(index).T.reshape(-1, 2).astype(np.int16) index = pd.MultiIndex.from_arrays(index.T, names=(dset1, dset2)) jpd = pd.DataFrame({k: v.flatten(order='F') for k, v in jpd.items()}, index=index).sort_index(axis=1) return jpd
[docs] def save(self, jpd, out_fpath): """ Save joint probability distribution to disk Parameters ---------- jpd : pandas.DataFrame Table of joint probability distribution densities to save out_fpath : str .csv, or .h5 file path to save joint probability distribution to """ with self.res_cls(self.res_h5) as f: meta = f['meta', jpd.columns.values]'Writing joint probability distribution to {}' .format(out_fpath)) if out_fpath.endswith('.csv'): jpd.to_csv(out_fpath) meta_fpath = out_fpath.split('.')[0] + '_meta.csv' if os.path.exists(meta_fpath): msg = ("Site meta data already exists at {}!") logger.warning(msg) warn(msg) else: logger.debug('Writing site meta data to {}' .format(meta_fpath)) meta.to_csv(meta_fpath, index=False) elif out_fpath.endswith('.h5'): with h5py.File(out_fpath, mode='w') as f: f.attrs['rex version'] = __version__ for i, n in enumerate(jpd.index.names):'') data = np.array(jpd.index.get_level_values(i)) dset = '{}-bins'.format(n) logger.debug('Writing {}'.format(dset)) f.create_dataset(dset, shape=data.shape, dtype=data.dtype, data=data) logger.debug('Writing joint probability density values to jpd') data = jpd.values f.create_dataset('jpd', shape=data.shape, dtype=data.dtype, data=data) logger.debug('Writing site meta data to meta') meta = to_records_array(meta) f.create_dataset('meta', shape=meta.shape, dtype=meta.dtype, data=meta) else: msg = ("Cannot save joint probability distribution, expecting " ".csv or .h5 path, but got: {}".format(out_fpath)) logger.error(msg) raise OSError(msg)
[docs] @staticmethod def plot_joint_pd(jpd, site=None, **kwargs): """ Plot the mean joint probability distribution accross all sites (site=None), or the distribution for the single given site Parameters ---------- jpd: pandas.DataFrame DataFrame of joint probability distribution between given datasets with given bins site : int, optional Site to plot distribution for, if None plot mean distribution across all sites, by default None """ x, y = jpd.index.names if site is not None: msg = ("Can only plot the joint probabilty distribution for a " "single site or the mean probability distribution accross " "all sites (site=None), you provided: {}".format(site)) assert isinstance(site), msg plt = jpd.loc[:, [site]].reset_index() else: site = 'mean_jpd' plt = jpd.mean(axis=1) = site plt = plt.reset_index() plt.plot.scatter(x=x, y=y, c=site, **kwargs)
[docs] @classmethod def run(cls, res_h5, dset1, dset2, bins1, bins2, sites=None, res_cls=Resource, hsds=False, max_workers=None, chunks_per_worker=5, out_fpath=None): """ Compute joint probability distribution between given datasets using given bins Parameters ---------- res_h5 : str Path to resource h5 file(s) dset1 : str Dataset 1 to generate joint probability distribution for dset2 : str Dataset 2 to generate joint probabilty distribution for bins1 : tuple (start, stop, step) for dataset 1 bins. The stop value is inclusive, so (0, 6, 2) would yield three bins with edges (0, 2, 4, 6). If the stop value is not perfectly divisible by the step, the last bin will overshoot the stop value. bins2 : tuple (start, stop, step) for dataset 2 bins. The stop value is inclusive, so (0, 6, 2) would yield three bins with edges (0, 2, 4, 6). If the stop value is not perfectly divisible by the step, the last bin will overshoot the stop value. sites : list | slice, optional Subset of sites to extract, by default None or all sites (sites is synonymous with gids aka spatial indices) res_cls : Class, optional Resource class to use to access res_h5, by default Resource hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS, by default False max_workers : None | int, optional Number of workers to use, if 1 run in serial, if None use all available cores, by default None chunks_per_worker : int, optional Number of chunks to extract on each worker, by default 5 out_fpath : str, optional .csv, or .h5 file path to save joint probability distribution to Returns ------- out : pandas.DataFrame DataFrame of joint probability distribution between given datasets with given bins """'Computing joint probability distribution between {} and ' '{} in {}' .format(dset1, dset2, res_h5)) logger.debug('Computing joint probability distribution using:' '\n-{} bins: {}' '\n-{} bins: {}' '\n-max workers: {}' '\n-chunks per worker: {}' .format(dset1, bins1, dset2, bins2, max_workers, chunks_per_worker)) jpd = cls(res_h5, res_cls=res_cls, hsds=hsds) out = jpd.compute(dset1, dset2, bins1, bins2, sites=sites, max_workers=max_workers, chunks_per_worker=chunks_per_worker) if out_fpath is not None:, out_fpath) return out
[docs] @classmethod def wind_rose(cls, wind_h5, hub_height, wspd_bins=(0, 30, 1), wdir_bins=(0, 360, 5), sites=None, res_cls=WindResource, hsds=False, max_workers=None, chunks_per_worker=5, out_fpath=None): """ Compute wind rose at given hub height Parameters ---------- wind_h5 : str Path to resource h5 file(s) hub_height : str | int Hub-height to compute wind rose at wspd_bins : tuple (start, stop, step) for wind speed bins wdir_bins : tuple (start, stop, step) for wind direction bins sites : list | slice, optional Subset of sites to extract, by default None or all sites (sites is synonymous with gids aka spatial indices) res_cls : Class, optional Resource class to use to access wind_h5, by default Resource hsds : bool, optional Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS behind HSDS, by default False max_workers : None | int, optional Number of workers to use, if 1 run in serial, if None use all available cores, by default None chunks_per_worker : int, optional Number of chunks to extract on each worker, by default 5 out_fpath : str, optional .csv, or .h5 file path to save wind rose to Returns ------- wind_rose : pandas.DataFrame DataFrame of wind rose frequencies at desired hub-height """'Computing wind rose for {}m wind in {}' .format(hub_height, wind_h5)) logger.debug('Computing wind rose using:' '\n-wind speed bins: {}' '\n-wind direction bins: {}' '\n-max workers: {}' '\n-chunks per worker: {}' .format(wspd_bins, wdir_bins, max_workers, chunks_per_worker)) wind_rose = cls(wind_h5, res_cls=res_cls, hsds=hsds) wspd_dset = 'windspeed_{}m'.format(hub_height) wdir_dset = 'winddirection_{}m'.format(hub_height) out = wind_rose.compute(wspd_dset, wdir_dset, wspd_bins, wdir_bins, sites=sites, max_workers=max_workers, chunks_per_worker=chunks_per_worker) if out_fpath is not None:, out_fpath) return out