Source code for floris.core.turbine.operation_models


from __future__ import annotations

import copy
from abc import abstractmethod
from typing import (
    Any,
    Dict,
    Final,
)

import numpy as np
from attrs import define, field
from scipy.interpolate import interp1d

from floris.core import BaseClass
from floris.core.rotor_velocity import (
    average_velocity,
    compute_tilt_angles_for_floating_turbines,
    rotor_velocity_air_density_correction,
    rotor_velocity_tilt_cosine_correction,
    rotor_velocity_yaw_cosine_correction,
)
from floris.type_dec import (
    NDArrayFloat,
    NDArrayObject,
)
from floris.utilities import cosd


POWER_SETPOINT_DEFAULT = 1e12
POWER_SETPOINT_DISABLED = 0.001


[docs] @define class BaseOperationModel(BaseClass): """ Base class for turbine operation models. All turbine operation models must implement static power(), thrust_coefficient(), and axial_induction() methods, which are called by power() and thrust_coefficient() through the interface in the turbine.py module. Args: BaseClass (_type_): _description_ Raises: NotImplementedError: _description_ NotImplementedError: _description_ """
[docs] @staticmethod @abstractmethod def power() -> None: raise NotImplementedError("BaseOperationModel.power")
[docs] @staticmethod @abstractmethod def thrust_coefficient() -> None: raise NotImplementedError("BaseOperationModel.thrust_coefficient")
[docs] @staticmethod @abstractmethod def axial_induction() -> None: # TODO: Consider whether we can make a generic axial_induction method # based purely on thrust_coefficient so that we don't need to implement # axial_induciton() in individual operation models. raise NotImplementedError("BaseOperationModel.axial_induction")
[docs] @define class SimpleTurbine(BaseOperationModel): """ Static class defining an actuator disk turbine model that is fully aligned with the flow. No handling for yaw or tilt angles. As with all turbine submodules, implements only static power() and thrust_coefficient() methods, which are called by power() and thrust_coefficient() on turbine.py, respectively. This class is not intended to be instantiated; it simply defines a library of static methods. """
[docs] def power( power_thrust_table: dict, velocities: NDArrayFloat, air_density: float, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): # Construct power interpolant power_interpolator = interp1d( power_thrust_table["wind_speed"], power_thrust_table["power"], fill_value=0.0, bounds_error=False, ) # Compute the power-effective wind speed across the rotor rotor_average_velocities = average_velocity( velocities=velocities, method=average_method, cubature_weights=cubature_weights, ) rotor_effective_velocities = rotor_velocity_air_density_correction( velocities=rotor_average_velocities, air_density=air_density, ref_air_density=power_thrust_table["ref_air_density"] ) # Compute power power = power_interpolator(rotor_effective_velocities) * 1e3 # Convert to W return power
[docs] def thrust_coefficient( power_thrust_table: dict, velocities: NDArrayFloat, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): # Construct thrust coefficient interpolant thrust_coefficient_interpolator = interp1d( power_thrust_table["wind_speed"], power_thrust_table["thrust_coefficient"], fill_value=0.0001, bounds_error=False, ) # Compute the effective wind speed across the rotor rotor_average_velocities = average_velocity( velocities=velocities, method=average_method, cubature_weights=cubature_weights, ) # TODO: Do we need an air density correction here? thrust_coefficient = thrust_coefficient_interpolator(rotor_average_velocities) thrust_coefficient = np.clip(thrust_coefficient, 0.0001, 0.9999) return thrust_coefficient
[docs] def axial_induction( power_thrust_table: dict, velocities: NDArrayFloat, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): thrust_coefficient = SimpleTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, average_method=average_method, cubature_weights=cubature_weights, ) return (1 - np.sqrt(1 - thrust_coefficient))/2
[docs] @define class CosineLossTurbine(BaseOperationModel): """ Static class defining an actuator disk turbine model that may be misaligned with the flow. Nonzero tilt and yaw angles are handled via cosine relationships, with the power lost to yawing defined by the cosine of the yaw misalignment raised to the power of cosine_loss_exponent_yaw. This turbine submodel is the default, and matches the turbine model in FLORIS v3. As with all turbine submodules, implements only static power() and thrust_coefficient() methods, which are called by power() and thrust_coefficient() on turbine.py, respectively. This class is not intended to be instantiated; it simply defines a library of static methods. """
[docs] def power( power_thrust_table: dict, velocities: NDArrayFloat, air_density: float, yaw_angles: NDArrayFloat, tilt_angles: NDArrayFloat, tilt_interp: NDArrayObject, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, correct_cp_ct_for_tilt: bool = False, **_ # <- Allows other models to accept other keyword arguments ): # Construct power interpolant power_interpolator = interp1d( power_thrust_table["wind_speed"], power_thrust_table["power"], fill_value=0.0, bounds_error=False, ) # Compute the power-effective wind speed across the rotor rotor_average_velocities = average_velocity( velocities=velocities, method=average_method, cubature_weights=cubature_weights, ) rotor_effective_velocities = rotor_velocity_air_density_correction( velocities=rotor_average_velocities, air_density=air_density, ref_air_density=power_thrust_table["ref_air_density"] ) rotor_effective_velocities = rotor_velocity_yaw_cosine_correction( cosine_loss_exponent_yaw=power_thrust_table["cosine_loss_exponent_yaw"], yaw_angles=yaw_angles, rotor_effective_velocities=rotor_effective_velocities, ) rotor_effective_velocities = rotor_velocity_tilt_cosine_correction( tilt_angles=tilt_angles, ref_tilt=power_thrust_table["ref_tilt"], cosine_loss_exponent_tilt=power_thrust_table["cosine_loss_exponent_tilt"], tilt_interp=tilt_interp, correct_cp_ct_for_tilt=correct_cp_ct_for_tilt, rotor_effective_velocities=rotor_effective_velocities, ) # Compute power power = power_interpolator(rotor_effective_velocities) * 1e3 # Convert to W return power
[docs] def thrust_coefficient( power_thrust_table: dict, velocities: NDArrayFloat, yaw_angles: NDArrayFloat, tilt_angles: NDArrayFloat, tilt_interp: NDArrayObject, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, correct_cp_ct_for_tilt: bool = False, **_ # <- Allows other models to accept other keyword arguments ): # Construct thrust coefficient interpolant thrust_coefficient_interpolator = interp1d( power_thrust_table["wind_speed"], power_thrust_table["thrust_coefficient"], fill_value=0.0001, bounds_error=False, ) # Compute the effective wind speed across the rotor rotor_average_velocities = average_velocity( velocities=velocities, method=average_method, cubature_weights=cubature_weights, ) # TODO: Do we need an air density correction here? thrust_coefficient = thrust_coefficient_interpolator(rotor_average_velocities) thrust_coefficient = np.clip(thrust_coefficient, 0.0001, 0.9999) # Apply tilt and yaw corrections # Compute the tilt, if using floating turbines old_tilt_angles = copy.deepcopy(tilt_angles) tilt_angles = compute_tilt_angles_for_floating_turbines( tilt_angles=tilt_angles, tilt_interp=tilt_interp, rotor_effective_velocities=rotor_average_velocities, ) # Only update tilt angle if requested (if the tilt isn't accounted for in the Ct curve) tilt_angles = np.where(correct_cp_ct_for_tilt, tilt_angles, old_tilt_angles) thrust_coefficient = ( thrust_coefficient * cosd(yaw_angles) * cosd(tilt_angles - power_thrust_table["ref_tilt"]) ) return thrust_coefficient
[docs] def axial_induction( power_thrust_table: dict, velocities: NDArrayFloat, yaw_angles: NDArrayFloat, tilt_angles: NDArrayFloat, tilt_interp: NDArrayObject, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, correct_cp_ct_for_tilt: bool = False, **_ # <- Allows other models to accept other keyword arguments ): thrust_coefficient = CosineLossTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, yaw_angles=yaw_angles, tilt_angles=tilt_angles, tilt_interp=tilt_interp, average_method=average_method, cubature_weights=cubature_weights, correct_cp_ct_for_tilt=correct_cp_ct_for_tilt ) misalignment_loss = cosd(yaw_angles) * cosd(tilt_angles - power_thrust_table["ref_tilt"]) return 0.5 / misalignment_loss * (1 - np.sqrt(1 - thrust_coefficient * misalignment_loss))
[docs] @define class SimpleDeratingTurbine(BaseOperationModel): """ power_thrust_table is a dictionary (normally defined on the turbine input yaml) that contains the parameters necessary to evaluate power(), thrust(), and axial_induction(). Any specific parameters for derating can be placed here. (they can be added to the turbine yaml). For this operation model to receive those arguements, they'll need to be added to the kwargs dictionaries in the respective functions on turbine.py. They won't affect the other operation models. """
[docs] def power( power_thrust_table: dict, velocities: NDArrayFloat, air_density: float, power_setpoints: NDArrayFloat | None, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): base_powers = SimpleTurbine.power( power_thrust_table=power_thrust_table, velocities=velocities, air_density=air_density, average_method=average_method, cubature_weights=cubature_weights ) if power_setpoints is None: return base_powers else: return np.minimum(base_powers, power_setpoints)
# TODO: would we like special handling of zero power setpoints # (mixed with non-zero values) to speed up computation in that case?
[docs] def thrust_coefficient( power_thrust_table: dict, velocities: NDArrayFloat, air_density: float, power_setpoints: NDArrayFloat, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): base_thrust_coefficients = SimpleTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, average_method=average_method, cubature_weights=cubature_weights ) if power_setpoints is None: return base_thrust_coefficients else: # Assume thrust coefficient scales directly with power base_powers = SimpleTurbine.power( power_thrust_table=power_thrust_table, velocities=velocities, air_density=air_density ) power_fractions = power_setpoints / base_powers thrust_coefficients = power_fractions * base_thrust_coefficients return np.minimum(base_thrust_coefficients, thrust_coefficients)
[docs] def axial_induction( power_thrust_table: dict, velocities: NDArrayFloat, air_density: float, power_setpoints: NDArrayFloat, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): thrust_coefficient = SimpleDeratingTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, air_density=air_density, power_setpoints=power_setpoints, average_method=average_method, cubature_weights=cubature_weights, ) return (1 - np.sqrt(1 - thrust_coefficient))/2
[docs] @define class MixedOperationTurbine(BaseOperationModel):
[docs] def power( yaw_angles: NDArrayFloat, power_setpoints: NDArrayFloat, **kwargs ): # Yaw angles mask all yaw_angles not equal to zero yaw_angles_mask = yaw_angles != 0.0 power_setpoints_mask = power_setpoints < POWER_SETPOINT_DEFAULT neither_mask = np.logical_not(yaw_angles_mask) & np.logical_not(power_setpoints_mask) if (power_setpoints_mask & yaw_angles_mask).any(): raise ValueError(( "Power setpoints and yaw angles are incompatible." "If yaw_angles entry is nonzero, power_setpoints must be greater than" " or equal to {0}.".format(POWER_SETPOINT_DEFAULT) )) powers = np.zeros_like(power_setpoints) powers[yaw_angles_mask] += CosineLossTurbine.power( yaw_angles=yaw_angles, **kwargs )[yaw_angles_mask] powers[power_setpoints_mask] += SimpleDeratingTurbine.power( power_setpoints=power_setpoints, **kwargs )[power_setpoints_mask] powers[neither_mask] += SimpleTurbine.power( **kwargs )[neither_mask] return powers
[docs] def thrust_coefficient( yaw_angles: NDArrayFloat, power_setpoints: NDArrayFloat, **kwargs ): yaw_angles_mask = yaw_angles != 0.0 power_setpoints_mask = power_setpoints < POWER_SETPOINT_DEFAULT neither_mask = np.logical_not(yaw_angles_mask) & np.logical_not(power_setpoints_mask) if (power_setpoints_mask & yaw_angles_mask).any(): raise ValueError(( "Power setpoints and yaw angles are incompatible." "If yaw_angles entry is nonzero, power_setpoints must be greater than" " or equal to {0}.".format(POWER_SETPOINT_DEFAULT) )) thrust_coefficients = np.zeros_like(power_setpoints) thrust_coefficients[yaw_angles_mask] += CosineLossTurbine.thrust_coefficient( yaw_angles=yaw_angles, **kwargs )[yaw_angles_mask] thrust_coefficients[power_setpoints_mask] += SimpleDeratingTurbine.thrust_coefficient( power_setpoints=power_setpoints, **kwargs )[power_setpoints_mask] thrust_coefficients[neither_mask] += SimpleTurbine.thrust_coefficient( **kwargs )[neither_mask] return thrust_coefficients
[docs] def axial_induction( yaw_angles: NDArrayFloat, power_setpoints: NDArrayFloat, **kwargs ): yaw_angles_mask = yaw_angles != 0.0 power_setpoints_mask = power_setpoints < POWER_SETPOINT_DEFAULT neither_mask = np.logical_not(yaw_angles_mask) & np.logical_not(power_setpoints_mask) if (power_setpoints_mask & yaw_angles_mask).any(): raise ValueError(( "Power setpoints and yaw angles are incompatible." "If yaw_angles entry is nonzero, power_setpoints must be greater than" " or equal to {0}.".format(POWER_SETPOINT_DEFAULT) )) axial_inductions = np.zeros_like(power_setpoints) axial_inductions[yaw_angles_mask] += CosineLossTurbine.axial_induction( yaw_angles=yaw_angles, **kwargs )[yaw_angles_mask] axial_inductions[power_setpoints_mask] += SimpleDeratingTurbine.axial_induction( power_setpoints=power_setpoints, **kwargs )[power_setpoints_mask] axial_inductions[neither_mask] += SimpleTurbine.axial_induction( **kwargs )[neither_mask] return axial_inductions
[docs] @define class AWCTurbine(BaseOperationModel): """ power_thrust_table is a dictionary (normally defined on the turbine input yaml) that contains the parameters necessary to evaluate power(), thrust(), and axial_induction(). Feel free to put any Helix tuning parameters into here (they can be added to the turbine yaml). Also, feel free to add any commanded inputs to power(), thrust_coefficient(), or axial_induction(). For this operation model to receive those arguments, they'll need to be added to the kwargs dictionaries in the respective functions on turbine.py. They won't affect the other operation models. """
[docs] def power( power_thrust_table: dict, velocities: NDArrayFloat, air_density: float, awc_modes: str, awc_amplitudes: NDArrayFloat | None, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): base_powers = SimpleTurbine.power( power_thrust_table=power_thrust_table, velocities=velocities, air_density=air_density, average_method=average_method, cubature_weights=cubature_weights ) if (awc_modes == 'helix').any(): if np.any(np.isclose( base_powers/1000, np.max(power_thrust_table['power']) )): raise UserWarning( 'The selected wind speed is above or near rated wind speed. ' '`AWCTurbine` operation model is not designed ' 'or verified for above-rated conditions.' ) return base_powers * (1 - ( power_thrust_table['helix_power_b'] + power_thrust_table['helix_power_c']*base_powers ) *awc_amplitudes**power_thrust_table['helix_a'] ) # TODO: Should probably add max function here if (awc_modes == 'baseline').any(): return base_powers else: raise UserWarning( 'Active wake mixing strategies other than the `helix` strategy ' 'have not yet been implemented in FLORIS. Returning baseline power.' )
[docs] def thrust_coefficient( power_thrust_table: dict, velocities: NDArrayFloat, awc_modes: str, awc_amplitudes: NDArrayFloat | None, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): base_thrust_coefficients = SimpleTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, average_method=average_method, cubature_weights=cubature_weights ) if (awc_modes == 'helix').any(): return base_thrust_coefficients * (1 - ( power_thrust_table['helix_thrust_b'] + power_thrust_table['helix_thrust_c']*base_thrust_coefficients ) *awc_amplitudes**power_thrust_table['helix_a'] ) if (awc_modes == 'baseline').any(): return base_thrust_coefficients else: raise UserWarning( 'Active wake mixing strategies other than the `helix` strategy ' 'have not yet been implemented in FLORIS. Returning baseline power.' )
[docs] def axial_induction( power_thrust_table: dict, velocities: NDArrayFloat, awc_modes: str, awc_amplitudes: NDArrayFloat, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): thrust_coefficient = AWCTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, awc_modes=awc_modes, awc_amplitudes=awc_amplitudes, average_method=average_method, cubature_weights=cubature_weights, ) return (1 - np.sqrt(1 - thrust_coefficient))/2
[docs] @define class PeakShavingTurbine():
[docs] def power( power_thrust_table: dict, velocities: NDArrayFloat, turbulence_intensities: NDArrayFloat, air_density: float, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): base_powers = SimpleTurbine.power( power_thrust_table=power_thrust_table, velocities=velocities, air_density=air_density, average_method=average_method, cubature_weights=cubature_weights ) # Get fraction by thrust base_thrust_coefficients = SimpleTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, average_method=average_method, cubature_weights=cubature_weights ) peak_shaving_thrust_coefficients = PeakShavingTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, turbulence_intensities=turbulence_intensities, average_method=average_method, cubature_weights=cubature_weights ) # Compute equivalent axial inductions base_ais = (1 - np.sqrt(1 - base_thrust_coefficients))/2 peak_shaving_ais = (1 - np.sqrt(1 - peak_shaving_thrust_coefficients))/2 # Power proportion power_fractions = ( (peak_shaving_thrust_coefficients * (1-peak_shaving_ais)) / (base_thrust_coefficients * (1-base_ais)) ) # Apply fraction to power and return powers = power_fractions * base_powers return powers
[docs] def thrust_coefficient( power_thrust_table: dict, velocities: NDArrayFloat, turbulence_intensities: NDArrayFloat, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): base_thrust_coefficients = SimpleTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, average_method=average_method, cubature_weights=cubature_weights ) peak_normal_thrust_prime = np.max( np.array(power_thrust_table["wind_speed"])**2 * np.array(power_thrust_table["thrust_coefficient"]) ) rotor_average_velocities = average_velocity( velocities=velocities, method=average_method, cubature_weights=cubature_weights, ) # Replace zeros with small values to avoid division by zero rotor_average_velocities = np.maximum(rotor_average_velocities, 0.01) max_allowable_thrust_coefficient = ( (1-power_thrust_table["peak_shaving_fraction"]) * peak_normal_thrust_prime / rotor_average_velocities**2 ) # Apply TI mask max_allowable_thrust_coefficient = np.where( ( turbulence_intensities.mean( axis=tuple([2 + i for i in range(turbulence_intensities.ndim - 2)]) ) >= power_thrust_table["peak_shaving_TI_threshold"] ), max_allowable_thrust_coefficient, base_thrust_coefficients ) thrust_coefficient = np.minimum(base_thrust_coefficients, max_allowable_thrust_coefficient) return thrust_coefficient
[docs] def axial_induction( power_thrust_table: dict, velocities: NDArrayFloat, turbulence_intensities: NDArrayFloat, average_method: str = "cubic-mean", cubature_weights: NDArrayFloat | None = None, **_ # <- Allows other models to accept other keyword arguments ): thrust_coefficient = PeakShavingTurbine.thrust_coefficient( power_thrust_table=power_thrust_table, velocities=velocities, turbulence_intensities=turbulence_intensities, average_method=average_method, cubature_weights=cubature_weights, ) return (1 - np.sqrt(1 - thrust_coefficient))/2