Source code for floris.parallel_floris_model

# Copyright 2022 Shell
import copy
import warnings
from time import perf_counter as timerpc

import numpy as np
import pandas as pd

from floris.floris_model import FlorisModel
from floris.logging_manager import LoggingManager
from floris.optimization.yaw_optimization.yaw_optimizer_sr import YawOptimizationSR
from floris.uncertain_floris_model import map_turbine_powers_uncertain, UncertainFlorisModel


def _get_turbine_powers_serial(fmodel_information, yaw_angles=None):
    fmodel = FlorisModel(fmodel_information)
    fmodel.set(yaw_angles=yaw_angles)
    fmodel.run()
    return (fmodel.get_turbine_powers(), fmodel.core.flow_field)


def _optimize_yaw_angles_serial(
    fmodel_information,
    minimum_yaw_angle,
    maximum_yaw_angle,
    yaw_angles_baseline,
    x0,
    Ny_passes,
    turbine_weights,
    exclude_downstream_turbines,
    verify_convergence,
    print_progress,
):
    fmodel_opt = FlorisModel(fmodel_information)
    yaw_opt = YawOptimizationSR(
        fmodel=fmodel_opt,
        minimum_yaw_angle=minimum_yaw_angle,
        maximum_yaw_angle=maximum_yaw_angle,
        yaw_angles_baseline=yaw_angles_baseline,
        x0=x0,
        Ny_passes=Ny_passes,
        turbine_weights=turbine_weights,
        exclude_downstream_turbines=exclude_downstream_turbines,
        verify_convergence=verify_convergence,
    )

    # Perform optimization but silence print statements to avoid cluttering
    df_opt = yaw_opt.optimize(print_progress=print_progress)
    return df_opt


[docs] class ParallelFlorisModel(LoggingManager): def __init__( self, fmodel, max_workers, n_wind_condition_splits, interface="multiprocessing", # Options are 'multiprocessing', 'mpi4py' or 'concurrent' use_mpi4py=None, propagate_flowfield_from_workers=False, print_timings=False ): """A wrapper around the nominal floris_interface class that adds parallel computing to common FlorisModel properties. Args: fmodel (FlorisModel or UncertainFlorisModel object): Interactive FLORIS object used to perform the wake and turbine calculations. Can either be a regular FlorisModel object or can be an UncertainFlorisModel object. max_workers (int): Number of parallel workers, typically equal to the number of cores you have on your system or HPC. n_wind_condition_splits (int): Number of sectors to split the wind findex array over. This is typically equal to max_workers, or a multiple of it. interface (str): Parallel computing interface to leverage. Recommended is 'concurrent' or 'multiprocessing' for local (single-system) use, and 'mpi4py' for high performance computing on multiple nodes. Defaults to 'multiprocessing'. use_mpi4py (bool): Deprecated option to enable/disable the usage of 'mpi4py'. This option has been superseded by 'interface'. propagate_flowfield_from_workers (bool): By enabling this, the flow field from every floris object (one for each worker) is exported, combined and sent back to the main module. This is slow so unless it's needed, it's recommended to be disabled. Defaults to False. print_timings (bool): Print the computation time to the console. Defaults to False. """ # Set defaults for backward compatibility if use_mpi4py is not None: warnings.warn( "The option 'mpi4py' will be removed in a future version. " "Please use the option 'interface'." ) if use_mpi4py: interface = "mpi4py" else: interface = "multiprocessing" if interface == "mpi4py": import mpi4py.futures as mp self._PoolExecutor = mp.MPIPoolExecutor elif interface == "multiprocessing": import multiprocessing as mp self._PoolExecutor = mp.Pool if max_workers is None: max_workers = mp.cpu_count() elif interface == "concurrent": from concurrent.futures import ProcessPoolExecutor self._PoolExecutor = ProcessPoolExecutor else: raise UserWarning( f"Interface '{interface}' not recognized. " "Please use 'concurrent', 'multiprocessing' or 'mpi4py'." ) # Initialize floris object and copy common properties if isinstance(fmodel, FlorisModel): self.fmodel = fmodel.copy() self._is_uncertain = False elif isinstance(fmodel, UncertainFlorisModel): self.fmodel = fmodel.fmodel_expanded.copy() self._is_uncertain = True self._weights = fmodel.weights self._n_unexpanded = fmodel.n_unexpanded self._n_sample_points = fmodel.n_sample_points self._map_to_expanded_inputs = fmodel.map_to_expanded_inputs self.core = self.fmodel.core # Static copy as a placeholder # Save to self self._n_wind_condition_splits = n_wind_condition_splits # Save initial user input self._max_workers = max_workers # Save initial user input self.n_wind_condition_splits = int( np.min([n_wind_condition_splits, self.fmodel.core.flow_field.n_findex]) ) self.max_workers = int( np.min([max_workers, self.n_wind_condition_splits]) ) self.propagate_flowfield_from_workers = propagate_flowfield_from_workers self.interface = interface self.print_timings = print_timings
[docs] def copy(self): # Make an independent copy self_copy = copy.deepcopy(self) self_copy.fmodel = self.fmodel.copy() return self_copy
[docs] def set( self, wind_speeds=None, wind_directions=None, wind_shear=None, wind_veer=None, reference_wind_height=None, turbulence_intensities=None, air_density=None, layout=None, layout_x=None, layout_y=None, turbine_type=None, solver_settings=None, ): """Pass to the FlorisModel set function. To allow users to directly replace a FlorisModel object with this UncertainFlorisModel object, this function is required.""" if layout is not None: msg = "Use the `layout_x` and `layout_y` parameters in place of `layout` " msg += "because the `layout` parameter will be deprecated in 3.3." self.logger.warning(msg) layout_x = layout[0] layout_y = layout[1] # Just passes arguments to the floris object fmodel = self.fmodel.copy() fmodel.set( wind_speeds=wind_speeds, wind_directions=wind_directions, wind_shear=wind_shear, wind_veer=wind_veer, reference_wind_height=reference_wind_height, turbulence_intensities=turbulence_intensities, air_density=air_density, layout_x=layout_x, layout_y=layout_y, turbine_type=turbine_type, solver_settings=solver_settings, ) # Reinitialize settings self.__init__( fmodel=fmodel, max_workers=self._max_workers, n_wind_condition_splits=self._n_wind_condition_splits, interface=self.interface, propagate_flowfield_from_workers=self.propagate_flowfield_from_workers, print_timings=self.print_timings, )
def _preprocessing(self, yaw_angles=None): # Format yaw angles if yaw_angles is None: yaw_angles = np.zeros(( self.fmodel.core.flow_field.n_findex, self.fmodel.core.farm.n_turbines )) # Prepare settings n_wind_condition_splits = self.n_wind_condition_splits n_wind_condition_splits = np.min( [n_wind_condition_splits, self.fmodel.core.flow_field.n_findex] ) # Prepare the input arguments for parallel execution fmodel_dict = self.fmodel.core.as_dict() wind_condition_id_splits = np.array_split( np.arange(self.fmodel.core.flow_field.n_findex), n_wind_condition_splits, ) multiargs = [] for wc_id_split in wind_condition_id_splits: # for ws_id_split in wind_speed_id_splits: fmodel_dict_split = copy.deepcopy(fmodel_dict) wind_directions = self.fmodel.core.flow_field.wind_directions[wc_id_split] wind_speeds = self.fmodel.core.flow_field.wind_speeds[wc_id_split] turbulence_intensities = self.fmodel.core.flow_field.turbulence_intensities[wc_id_split] yaw_angles_subset = yaw_angles[wc_id_split[0]:wc_id_split[-1]+1, :] fmodel_dict_split["flow_field"]["wind_directions"] = wind_directions fmodel_dict_split["flow_field"]["wind_speeds"] = wind_speeds fmodel_dict_split["flow_field"]["turbulence_intensities"] = turbulence_intensities # Prepare lightweight data to pass along multiargs.append((fmodel_dict_split, yaw_angles_subset)) return multiargs # Function to merge subsets in dictionaries def _merge_subsets(self, field, subset): i, j, k = np.shape(subset) subset_reshape = np.reshape(subset, (i*j, k)) return [eval("f.{:s}".format(field) for f in subset_reshape)] def _postprocessing(self, output): # Split results power_subsets = [p[0] for p in output] flowfield_subsets = [p[1] for p in output] # Retrieve and merge turbine power productions turbine_powers = np.concatenate(power_subsets, axis=0) # Optionally, also merge flow field dictionaries from individual floris solutions if self.propagate_flowfield_from_workers: self.core = self.fmodel.core # Refresh static copy of underlying floris class # self.core.flow_field.u_initial = self._merge_subsets("u_initial", flowfield_subsets) # self.core.flow_field.v_initial = self._merge_subsets("v_initial", flowfield_subsets) # self.core.flow_field.w_initial = self._merge_subsets("w_initial", flowfield_subsets) self.core.flow_field.u = self._merge_subsets("u", flowfield_subsets) self.core.flow_field.v = self._merge_subsets("v", flowfield_subsets) self.core.flow_field.w = self._merge_subsets("w", flowfield_subsets) self.core.flow_field.turbulence_intensity_field = self._merge_subsets( "turbulence_intensity_field", flowfield_subsets ) return turbine_powers
[docs] def run(self): raise UserWarning( "'run' not supported on ParallelFlorisModel. Please use " "'get_turbine_powers' or 'get_farm_power' directly." )
[docs] def get_turbine_powers(self, yaw_angles=None): # Retrieve multiargs: preprocessing t0 = timerpc() multiargs = self._preprocessing(yaw_angles) t_preparation = timerpc() - t0 # Perform parallel calculation t1 = timerpc() with self._PoolExecutor(self.max_workers) as p: if (self.interface == "mpi4py") or (self.interface == "multiprocessing"): out = p.starmap(_get_turbine_powers_serial, multiargs) else: out = p.map( _get_turbine_powers_serial, [j[0] for j in multiargs], [j[1] for j in multiargs] ) # out = list(out) t_execution = timerpc() - t1 # Postprocessing: merge power production (and opt. flow field) from individual runs t2 = timerpc() turbine_powers = self._postprocessing(out) if self._is_uncertain: turbine_powers = map_turbine_powers_uncertain( unique_turbine_powers=turbine_powers, map_to_expanded_inputs=self._map_to_expanded_inputs, weights=self._weights, n_unexpanded=self._n_unexpanded, n_sample_points=self._n_sample_points, n_turbines=self.fmodel.core.farm.n_turbines, ) t_postprocessing = timerpc() - t2 t_total = timerpc() - t0 if self.print_timings: print("===============================================================================") print( "Total time spent for parallel calculation " f"({self.max_workers} workers): {t_total:.3f} s" ) print(f" Time spent in parallel preprocessing: {t_preparation:.3f} s") print(f" Time spent in parallel loop execution: {t_execution:.3f} s.") print(f" Time spent in parallel postprocessing: {t_postprocessing:.3f} s") return turbine_powers
[docs] def get_farm_power(self, yaw_angles=None, turbine_weights=None): if turbine_weights is None: # Default to equal weighing of all turbines when turbine_weights is None turbine_weights = np.ones( ( (self._n_unexpanded if self._is_uncertain else self.fmodel.core.flow_field.n_findex), self.fmodel.core.farm.n_turbines ) ) elif len(np.shape(turbine_weights)) == 1: # Deal with situation when 1D array is provided turbine_weights = np.tile( turbine_weights, ( (self._n_unexpanded if self._is_uncertain else self.fmodel.core.flow_field.n_findex), 1 ) ) # Calculate all turbine powers and apply weights turbine_powers = self.get_turbine_powers(yaw_angles=yaw_angles) turbine_powers = np.multiply(turbine_weights, turbine_powers) return np.sum(turbine_powers, axis=1)
[docs] def get_farm_AEP( self, freq, cut_in_wind_speed=None, cut_out_wind_speed=None, yaw_angles=None, turbine_weights=None, no_wake=False, ) -> float: """ Estimate annual energy production (AEP) for distributions of wind speed, wind direction, frequency of occurrence, and yaw offset. Args: freq (NDArrayFloat): NumPy array with shape (n_wind_directions, n_wind_speeds) with the frequencies of each wind direction and wind speed combination. These frequencies should typically sum up to 1.0 and are used to weigh the wind farm power for every condition in calculating the wind farm's AEP. cut_in_wind_speed (float, optional): No longer supported. cut_out_wind_speed (float, optional): No longer supported. yaw_angles (NDArrayFloat | list[float] | None, optional): The relative turbine yaw angles in degrees. If None is specified, will assume that the turbine yaw angles are all zero degrees for all conditions. Defaults to None. turbine_weights (NDArrayFloat | list[float] | None, optional): weighing terms that allow the user to emphasize power at particular turbines and/or completely ignore the power from other turbines. This is useful when, for example, you are modeling multiple wind farms in a single floris object. If you only want to calculate the power production for one of those farms and include the wake effects of the neighboring farms, you can set the turbine_weights for the neighboring farms' turbines to 0.0. The array of turbine powers from floris is multiplied with this array in the calculation of the objective function. If None, this is an array with all values 1.0 and with shape equal to (n_wind_directions, n_wind_speeds, n_turbines). Defaults to None. no_wake: (bool, optional): When *True* updates the turbine quantities without calculating the wake or adding the wake to the flow field. This can be useful when quantifying the loss in AEP due to wakes. Defaults to *False*. Returns: float: The Annual Energy Production (AEP) for the wind farm in watt-hours. """ # If no_wake==True, ignore parallelization because it's fast enough if no_wake: return self.fmodel.get_farm_AEP( freq=freq, cut_in_wind_speed=cut_in_wind_speed, cut_out_wind_speed=cut_out_wind_speed, yaw_angles=yaw_angles, turbine_weights=turbine_weights, no_wake=no_wake ) # Verify dimensions of the variable "freq" if ((self._is_uncertain and np.shape(freq)[0] != self._n_unexpanded) or (not self._is_uncertain and np.shape(freq)[0] != self.fmodel.core.flow_field.n_findex)): raise UserWarning( "'freq' should be a one-dimensional array with dimensions (n_findex). " f"Given shape is {np.shape(freq)}" ) # Check if frequency vector sums to 1.0. If not, raise a warning if np.abs(np.sum(freq) - 1.0) > 0.001: self.logger.warning( "WARNING: The frequency array provided to get_farm_AEP() does not sum to 1.0." ) # Copy the full wind speed array from the floris object and initialize # the the farm_power variable as an empty array. wind_speeds = np.array(self.fmodel.core.flow_field.wind_speeds, copy=True) wind_directions = np.array(self.fmodel.core.flow_field.wind_directions, copy=True) turbulence_intensities = np.array( self.fmodel.core.flow_field.turbulence_intensities, copy=True, ) farm_power = np.zeros( self._n_unexpanded if self._is_uncertain else self.core.flow_field.n_findex ) # Determine which wind speeds we must evaluate in floris if cut_in_wind_speed is not None or cut_out_wind_speed is not None: raise NotImplementedError( "WARNING: The 'cut_in_wind_speed' and 'cut_out_wind_speed' " "parameters are no longer supported in the 'ParallelFlorisModel.get_farm_AEP' " "method." ) farm_power = ( self.get_farm_power(yaw_angles=yaw_angles, turbine_weights=turbine_weights) ) # Finally, calculate AEP in GWh aep = np.nansum(np.multiply(freq, farm_power) * 365 * 24) # Reset the FLORIS object to the full wind speed array self.fmodel.set( wind_directions=wind_directions, wind_speeds=wind_speeds, turbulence_intensities=turbulence_intensities, ) return aep
[docs] def optimize_yaw_angles( self, minimum_yaw_angle=-25.0, maximum_yaw_angle=25.0, yaw_angles_baseline=None, x0=None, Ny_passes=[5,4], turbine_weights=None, exclude_downstream_turbines=True, verify_convergence=False, print_worker_progress=False, # Recommended disabled to avoid clutter. Useful for debugging ): # Prepare the inputs to each core for multiprocessing module t0 = timerpc() multiargs = self._preprocessing() for ii in range(len(multiargs)): multiargs[ii] = ( multiargs[ii][0], minimum_yaw_angle, maximum_yaw_angle, yaw_angles_baseline, x0, Ny_passes, turbine_weights, exclude_downstream_turbines, verify_convergence, print_worker_progress, ) t1 = timerpc() # Optimize yaw angles using parallel processing print("Optimizing yaw angles with {:d} workers.".format(self.max_workers)) with self._PoolExecutor(self.max_workers) as p: if (self.interface == "mpi4py") or (self.interface == "multiprocessing"): df_opt_splits = p.starmap(_optimize_yaw_angles_serial, multiargs) else: df_opt_splits = p.map( _optimize_yaw_angles_serial, [j[0] for j in multiargs], [j[1] for j in multiargs], [j[2] for j in multiargs], [j[3] for j in multiargs], [j[4] for j in multiargs], [j[5] for j in multiargs], [j[6] for j in multiargs], [j[7] for j in multiargs], [j[8] for j in multiargs], [j[9] for j in multiargs], ) t2 = timerpc() # Combine all solutions from multiprocessing into single dataframe df_opt = pd.concat(df_opt_splits, axis=0).reset_index(drop=True).sort_values( by=["wind_direction", "wind_speed", "turbulence_intensity"] ) t3 = timerpc() if self.print_timings: print("===============================================================================") print( "Total time spent for parallel calculation " f"({self.max_workers} workers): {t3 - t0:.3f} s" ) print(" Time spent in parallel preprocessing: {:.3f} s".format(t1 - t0)) print(" Time spent in parallel loop execution: {:.3f} s.".format(t2 - t1)) print(" Time spent in parallel postprocessing: {:.3f} s".format(t3 - t2)) return df_opt
@property def layout_x(self): return self.fmodel.layout_x @property def layout_y(self): return self.fmodel.layout_y @property def wind_speeds(self): return self.fmodel.wind_speeds @property def wind_directions(self): return self.fmodel.wind_directions @property def turbulence_intensities(self): return self.fmodel.turbulence_intensities @property def n_findex(self): return self.fmodel.n_findex @property def n_turbines(self): return self.fmodel.n_turbines
# @property # def floris(self): # return self.fmodel.core