Source code for reVX.wind_dirs.mean_wind_dirs

# -*- coding: utf-8 -*-
"""
Compute the mean wind direction for each supply curve point
"""
import logging
import numpy as np

from reV.supply_curve.aggregation import Aggregation, AggFileHandler
from reV.supply_curve.extent import SupplyCurveExtent
from reV.utilities.exceptions import EmptySupplyCurvePointError
from reVX.wind_dirs.mean_wind_dirs_point import MeanWindDirectionsPoint
from reVX.utilities.utilities import log_versions
from rex.utilities.loggers import log_mem

logger = logging.getLogger(__name__)


[docs]class MeanWindDirections(Aggregation): """ Average the wind direction via the wind vectors. Then convert to equivalent sc_point_gid """ def __init__(self, excl_fpath, wdir_dsets, tm_dset='techmap_wtk', excl_dict=None, area_filter_kernel='queen', min_area=None, resolution=128, excl_area=None): """ Parameters ---------- excl_fpath : str Filepath to exclusions h5 with techmap dataset. wdir_dsets : str | list Wind direction dataset to average tm_dset : str, optional Dataset name in the techmap file containing the exclusions-to-resource mapping data, by default 'techmap_wtk' excl_dict : dict | None, optional Dictionary of exclusion LayerMask arugments {layer: {kwarg: value}} by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default 'queen' min_area : float | None, optional Minimum required contiguous area filter in sq-km, by default None resolution : int | None, optional SC resolution, must be input in combination with gid, by default 128 excl_area : float | None, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None """ log_versions(logger) if isinstance(wdir_dsets, str): wdir_dsets = [wdir_dsets] for dset in wdir_dsets: if not dset.startswith('winddirection'): msg = ('{} is not a valid wind direction dataset!' .format(dset)) logger.error(msg) raise ValueError(msg) super().__init__(excl_fpath, tm_dset, *wdir_dsets, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area, resolution=resolution, excl_area=excl_area) # pylint: disable=unused-argument
[docs] @classmethod def run_serial(cls, excl_fpath, h5_fpath, tm_dset, *wind_dir_dset, excl_dict=None, inclusion_mask=None, area_filter_kernel='queen', min_area=None, resolution=128, excl_area=0.0081, gids=None, gen_index=None, **kwargs): """ Standalone method to aggregate - can be parallelized. Parameters ---------- excl_fpath : str | list | tuple Filepath to exclusions h5 with techmap dataset (can be one or more filepaths). h5_fpath : str Filepath to .h5 file to aggregate tm_dset : str Dataset name in the techmap file containing the exclusions-to-resource mapping data. wind_dir_dset : str Wind directions to aggreate, can supply multiple datasets excl_dict : dict, optional Dictionary of exclusion LayerMask arugments {layer: {kwarg: value}} by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default "queen" min_area : float, optional Minimum required contiguous area filter in sq-km, by default None resolution : int, optional SC resolution, must be input in combination with gid. Prefered option is to use the row/col slices to define the SC point instead, by default 0.0081 excl_area : float, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None gids : list, optional List of gids to get summary for (can use to subset if running in parallel), or None for all gids in the SC extent, by default None gen_index : np.ndarray, optional Array of generation gids with array index equal to resource gid. Array value is -1 if the resource index was not used in the generation run, by default None kwargs : dict Unused kwargs from Aggregation.run_serial method, namely agg_method Returns ------- agg_out : dict Aggregated values for each aggregation dataset """ with SupplyCurveExtent(excl_fpath, resolution=resolution) as sc: exclusion_shape = sc.exclusions.shape if gids is None: gids = sc.valid_sc_points(tm_dset) elif np.issubdtype(type(gids), np.number): gids = [gids] slice_lookup = sc.get_slice_lookup(gids) cls._check_inclusion_mask(inclusion_mask, gids, exclusion_shape) # pre-extract handlers so they are not repeatedly initialized file_kwargs = {'excl_dict': excl_dict, 'area_filter_kernel': area_filter_kernel, 'min_area': min_area} dsets = wind_dir_dset + ('meta', ) agg_out = {ds: [] for ds in dsets} with AggFileHandler(excl_fpath, h5_fpath, **file_kwargs) as fh: n_finished = 0 for gid in gids: gid_inclusions = cls._get_gid_inclusion_mask( inclusion_mask, gid, slice_lookup, resolution=resolution) try: gid_out = MeanWindDirectionsPoint.run( gid, fh.exclusions, fh.h5, tm_dset, *wind_dir_dset, excl_dict=excl_dict, inclusion_mask=gid_inclusions, resolution=resolution, excl_area=excl_area, exclusion_shape=exclusion_shape, close=False, gen_index=gen_index) except EmptySupplyCurvePointError: logger.debug('SC gid {} is fully excluded or does not ' 'have any valid source data!'.format(gid)) except Exception: logger.exception('SC gid {} failed!'.format(gid)) raise else: n_finished += 1 logger.debug('Serial aggregation: ' '{} out of {} points complete' .format(n_finished, len(gids))) log_mem(logger) for k, v in gid_out.items(): agg_out[k].append(v) return agg_out
[docs] def aggregate(self, res_h5_fpath, max_workers=None, sites_per_worker=1000): """ Average wind directions to sc_points Parameters ---------- res_h5_fpath : str Filepath to .h5 file containing wind direction data max_workers : int | None Number of cores to run summary on. None is all available cpus. sites_per_worker : int, optional Number of SC points to process on a single parallel worker, by default 1000 Returns ------- agg : dict Aggregated values for each aggregation dataset """ agg = super().aggregate(res_h5_fpath, max_workers=max_workers, sites_per_worker=sites_per_worker) return agg
[docs] @classmethod def run(cls, res_h5_fpath, excl_fpath, wdir_dsets, tm_dset='techmap_wtk', excl_dict=None, area_filter_kernel='queen', min_area=None, resolution=128, excl_area=None, max_workers=None, sites_per_worker=1000, out_fpath=None): """ Aggregate powerrose to supply curve points, find neighboring supply curve point gids and rank them based on prominent powerrose direction Parameters ---------- res_h5_fpath : str Filepath to .h5 file containing wind direction data excl_fpath : str Filepath to exclusions h5 with techmap dataset. wdir_dsets : str | list Wind direction dataset to average tm_dset : str, optional Dataset name in the techmap file containing the exclusions-to-resource mapping data, by default 'techmap_wtk' excl_dict : dict | None, optional Dictionary of exclusion LayerMask arugments {layer: {kwarg: value}} by default None area_filter_kernel : str, optional Contiguous area filter method to use on final exclusions mask, by default 'queen' min_area : float | None, optional Minimum required contiguous area filter in sq-km, by default None resolution : int | None, optional SC resolution, must be input in combination with gid, by default 128 excl_area : float | None, optional Area of an exclusion pixel in km2. None will try to infer the area from the profile transform attribute in excl_fpath, by default None max_workers : int | None, optional Number of cores to run summary on. None is all available cpus, by default None sites_per_worker : int, optional Number of SC points to process on a single parallel worker, by default 1000 out_fpath : str Path to .h5 file to save aggregated data too Returns ------- agg : dict Aggregated values for each aggregation dataset """ wdir = cls(excl_fpath, wdir_dsets, tm_dset=tm_dset, excl_dict=excl_dict, area_filter_kernel=area_filter_kernel, min_area=min_area, resolution=resolution, excl_area=excl_area) agg = wdir.aggregate(res_h5_fpath, max_workers=max_workers, sites_per_worker=sites_per_worker) if out_fpath is not None: wdir.save_agg_to_h5(res_h5_fpath, out_fpath, agg) return agg