from __future__ import annotations
import copy
import inspect
from abc import abstractmethod
from pathlib import Path
from typing import List
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype
from scipy.interpolate import (
LinearNDInterpolator,
NearestNDInterpolator,
RegularGridInterpolator,
)
from floris.heterogeneous_map import HeterogeneousMap
from floris.type_dec import NDArrayFloat
from floris.utilities import (
check_and_identify_step_size,
make_wind_directions_adjacent,
wrap_180,
)
[docs]
class WindDataBase:
"""
Super class that WindRose and TimeSeries inherit from, enforcing the implementation of
unpack() on the child classes and providing the general functions unpack_for_reinitialize() and
unpack_freq().
"""
[docs]
@abstractmethod
def unpack(self):
"""
Placeholder for child classes of WindDataBase, which each need to implement the unpack()
method.
"""
raise NotImplementedError("unpack() not implemented on {0}".format(self.__class__.__name__))
[docs]
def unpack_for_reinitialize(self):
"""
Return only the variables need for FlorisModel.reinitialize
"""
(
wind_directions_unpack,
wind_speeds_unpack,
ti_table_unpack,
_,
_,
heterogeneous_inflow_config,
) = self.unpack()
return (
wind_directions_unpack,
wind_speeds_unpack,
ti_table_unpack,
heterogeneous_inflow_config,
)
[docs]
def unpack_freq(self):
"""Unpack frequency weighting"""
return self.unpack()[3]
[docs]
def unpack_value(self):
"""Unpack values of power generated"""
return self.unpack()[4]
[docs]
def check_heterogeneous_inflow_config(self, heterogeneous_inflow_config):
"""
Check that the heterogeneous_inflow_config dictionary is properly formatted
Args:
heterogeneous_inflow_config (dict): A dictionary containing the following keys:
* 'speed_multipliers': A 2D NumPy array (size n_findex x num_points)
of speed multipliers.
* 'x': A 1D NumPy array (size num_points) of x-coordinates (meters).
* 'y': A 1D NumPy array (size num_points) of y-coordinates (meters).
"""
if heterogeneous_inflow_config is not None:
if not isinstance(heterogeneous_inflow_config, dict):
raise TypeError("heterogeneous_inflow_config_by_wd must be a dictionary")
if "speed_multipliers" not in heterogeneous_inflow_config:
raise ValueError(
"heterogeneous_inflow_config must contain a key 'speed_multipliers'"
)
if "x" not in heterogeneous_inflow_config:
raise ValueError("heterogeneous_inflow_config must contain a key 'x'")
if "y" not in heterogeneous_inflow_config:
raise ValueError("heterogeneous_inflow_config must contain a key 'y'")
[docs]
def set_layout(self, layout_x=None, layout_y=None):
"""
Default implementation the explicitly does nothing. Only WindData objects that depend
on layout need to implement this method.
Included so that FlorisModel can call this method on the WindData object when the layout
is updated.
Args:
layout_x (list, optional): List of x-coordinates of the turbines. Defaults to None.
layout_y (list, optional): List of y-coordinates of the turbines. Defaults to None.
"""
# No operation performed
return None
[docs]
class WindRose(WindDataBase):
"""
The WindRose class is used to drive FLORIS and optimization operations in
which the inflow is characterized by the frequency of binned wind speed and
wind direction values. Turbulence intensities are defined as a function of
wind direction and wind speed.
Args:
wind_directions: NumPy array of wind directions (NDArrayFloat). Must
be evenly spaced and monotonically increasing.
wind_speeds: NumPy array of wind speeds (NDArrayFloat). Must be
evenly spaced and monotonically increasing.
ti_table: Turbulence intensity table for binned wind direction, wind
speed values (float, NDArrayFloat). Can be an array with dimensions
(n_wind_directions, n_wind_speeds) or a single float value. If a
single float value is provided, the turbulence intensity is assumed
to be constant across all wind directions and wind speeds.
freq_table: Frequency table for binned wind direction, wind speed
values (NDArrayFloat, optional). Must have dimension
(n_wind_directions, n_wind_speeds). Defaults to None in which case
uniform frequency of all bins is assumed.
value_table: Value table for binned wind direction, wind
speed values (NDArrayFloat, optional). Must have dimension
(n_wind_directions, n_wind_speeds). Defaults to None in which case
uniform values are assumed. Value can be used to weight power in
each bin to compute the total value of the energy produced
compute_zero_freq_occurrence: Flag indicating whether to compute zero
frequency occurrences (bool, optional). Defaults to False.
heterogeneous_map (HeterogeneousMap, optional): A HeterogeneousMap object to define
background heterogeneous inflow condition as a function
of wind direction and wind speed. Alternatively, a dictionary can be
passed in to define a HeterogeneousMap object. Defaults to None.
heterogeneous_inflow_config_by_wd (dict, optional): A dictionary containing the following
which can be used to define a heterogeneous_map object (note this parameter is kept
for backwards compatibility and is not recommended for use):
* 'x': A 1D NumPy array (size num_points) of x-coordinates (meters).
* 'y': A 1D NumPy array (size num_points) of y-coordinates (meters).
* 'speed_multipliers': A 2D NumPy array (size num_wd (or num_ws) x num_points)
of speed multipliers. If neither wind_directions nor wind_speeds are
defined, then this should be a single row array
* 'wind_directions': A 1D NumPy array (size num_wd) of wind directions (degrees).
Optional.
* 'wind_speeds': A 1D NumPy array (size num_ws) of wind speeds (m/s). Optional.
Defaults to None.
"""
def __init__(
self,
wind_directions: NDArrayFloat,
wind_speeds: NDArrayFloat,
ti_table: float | NDArrayFloat,
freq_table: NDArrayFloat | None = None,
value_table: NDArrayFloat | None = None,
compute_zero_freq_occurrence: bool = False,
heterogeneous_map: HeterogeneousMap | dict | None = None,
heterogeneous_inflow_config_by_wd: dict | None = None,
):
if not isinstance(wind_directions, np.ndarray):
raise TypeError("wind_directions must be a NumPy array")
if not isinstance(wind_speeds, np.ndarray):
raise TypeError("wind_speeds must be a NumPy array")
# Confirm that both wind_directions and wind_speeds are monitonically
# increasing and evenly spaced
if len(wind_directions) > 1:
# Check monotonically increasing
if not np.all(np.diff(wind_directions) > 0):
raise ValueError("wind_directions must be monotonically increasing")
# Check evenly spaced (Function will raise error if not)
check_and_identify_step_size(wind_directions=wind_directions)
if len(wind_speeds) > 1:
# Check monotonically increasing
if not np.all(np.diff(wind_speeds) > 0):
raise ValueError("wind_speeds must be monotonically increasing")
# Check evenly spaced
if not np.allclose(np.diff(wind_speeds), wind_speeds[1] - wind_speeds[0]):
raise ValueError("wind_speeds must be evenly spaced")
# Save the wind speeds and directions
self.wind_directions = wind_directions
self.wind_speeds = wind_speeds
# Check ti_table is a float or a NumPy array
if not isinstance(ti_table, (float, np.ndarray)):
raise TypeError("ti_table must be a float or a NumPy array")
# Check if ti_table is a single float value
if isinstance(ti_table, float):
self.ti_table = np.full((len(wind_directions), len(wind_speeds)), ti_table)
# Otherwise confirm the dimensions and then save it
else:
if not ti_table.shape[0] == len(wind_directions):
raise ValueError("ti_table first dimension must equal len(wind_directions)")
if not ti_table.shape[1] == len(wind_speeds):
raise ValueError("ti_table second dimension must equal len(wind_speeds)")
self.ti_table = ti_table
# If freq_table is not None, confirm it has correct dimension,
# otherwise initialize to uniform probability
if freq_table is not None:
if not freq_table.shape[0] == len(wind_directions):
raise ValueError("freq_table first dimension must equal len(wind_directions)")
if not freq_table.shape[1] == len(wind_speeds):
raise ValueError("freq_table second dimension must equal len(wind_speeds)")
self.freq_table = freq_table
else:
self.freq_table = np.ones((len(wind_directions), len(wind_speeds)))
# Normalize freq table
self.freq_table = self.freq_table / np.sum(self.freq_table)
# If value_table is not None, confirm it has correct dimension,
# otherwise initialize to all ones
if value_table is not None:
if not value_table.shape[0] == len(wind_directions):
raise ValueError("value_table first dimension must equal len(wind_directions)")
if not value_table.shape[1] == len(wind_speeds):
raise ValueError("value_table second dimension must equal len(wind_speeds)")
self.value_table = value_table
# Save whether zero occurrence cases should be computed
# First check if the ti_table contains any nan values (which would occur for example
# if generated by the TimeSeries to WindRose conversion for wind speeds and directions
# that were not present in the original time series) In this case, raise an error
if compute_zero_freq_occurrence:
if np.isnan(self.ti_table).any():
raise ValueError(
"ti_table contains nan values. (This is likely the result of "
" unsed wind speeds and directions in the original time series.)"
" Cannot compute zero frequency occurrences."
)
self.compute_zero_freq_occurrence = compute_zero_freq_occurrence
# Check that heterogeneous_map and heterogeneous_inflow_config_by_wd are not both defined
if heterogeneous_map is not None and heterogeneous_inflow_config_by_wd is not None:
raise ValueError(
"Only one of heterogeneous_map and heterogeneous_inflow_config_by_wd can be"
+ " defined."
)
# If heterogeneous_inflow_config_by_wd is not None, then create a HeterogeneousMap object
# using the dictionary
if heterogeneous_inflow_config_by_wd is not None:
# TODO: In future, add deprecation warning for this parameter here
self.heterogeneous_map = HeterogeneousMap(**heterogeneous_inflow_config_by_wd)
# Else if heterogeneous_map is not None
elif heterogeneous_map is not None:
# If heterogeneous_map is a dictionary, then create a HeterogeneousMap object
if isinstance(heterogeneous_map, dict):
self.heterogeneous_map = HeterogeneousMap(**heterogeneous_map)
# Else if heterogeneous_map is a HeterogeneousMap object, then save it
elif isinstance(heterogeneous_map, HeterogeneousMap):
self.heterogeneous_map = heterogeneous_map
# Else raise an error
else:
raise ValueError(
"heterogeneous_map must be a HeterogeneousMap object or a dictionary."
)
# Else if neither heterogeneous_map nor heterogeneous_inflow_config_by_wd are defined,
# then set heterogeneous_map to None
else:
self.heterogeneous_map = None
# Build the gridded and flatten versions
self._build_gridded_and_flattened_version()
def _build_gridded_and_flattened_version(self):
"""
Given the wind direction and speed array, build the gridded versions
covering all combinations, and then flatten versions which put all
combinations into 1D array
"""
# Gridded wind speed and direction
self.wd_grid, self.ws_grid = np.meshgrid(
self.wind_directions, self.wind_speeds, indexing="ij"
)
# Flat wind speed and direction
self.wd_flat = self.wd_grid.flatten()
self.ws_flat = self.ws_grid.flatten()
# Flat frequency table
self.freq_table_flat = self.freq_table.flatten()
# Flat TI table
self.ti_table_flat = self.ti_table.flatten()
# value table
if self.value_table is not None:
self.value_table_flat = self.value_table.flatten()
else:
self.value_table_flat = None
# Set mask to non-zero frequency cases depending on compute_zero_freq_occurrence
if self.compute_zero_freq_occurrence:
# If computing zero freq occurrences, then this is all True
self.non_zero_freq_mask = [True for i in range(len(self.freq_table_flat))]
else:
self.non_zero_freq_mask = self.freq_table_flat > 0.0
# N_findex should only be the calculated cases
self.n_findex = np.sum(self.non_zero_freq_mask)
[docs]
def unpack(self):
"""
Unpack the flattened versions of the matrices and return the values
accounting for the non_zero_freq_mask
"""
# The unpacked versions start as the flat version of each
wind_directions_unpack = self.wd_flat.copy()
wind_speeds_unpack = self.ws_flat.copy()
freq_table_unpack = self.freq_table_flat.copy()
ti_table_unpack = self.ti_table_flat.copy()
# Now mask thes values according to self.non_zero_freq_mask
wind_directions_unpack = wind_directions_unpack[self.non_zero_freq_mask]
wind_speeds_unpack = wind_speeds_unpack[self.non_zero_freq_mask]
freq_table_unpack = freq_table_unpack[self.non_zero_freq_mask]
ti_table_unpack = ti_table_unpack[self.non_zero_freq_mask]
# Now get unpacked value table
if self.value_table_flat is not None:
value_table_unpack = self.value_table_flat[self.non_zero_freq_mask].copy()
else:
value_table_unpack = None
# If heterogeneous_map is not None, then get the heterogeneous_inflow_config
if self.heterogeneous_map is not None:
heterogeneous_inflow_config = self.heterogeneous_map.get_heterogeneous_inflow_config(
wind_directions=wind_directions_unpack, wind_speeds=wind_speeds_unpack
)
else:
heterogeneous_inflow_config = None
return (
wind_directions_unpack,
wind_speeds_unpack,
ti_table_unpack,
freq_table_unpack,
value_table_unpack,
heterogeneous_inflow_config,
)
[docs]
def aggregate(self, wd_step=None, ws_step=None, inplace=False):
"""
Wrapper for downsample method for backwards compatibility
"""
return self.downsample(wd_step, ws_step, inplace)
[docs]
def downsample(self, wd_step=None, ws_step=None, inplace=False):
"""
Aggregates the wind rose into fewer wind direction and wind speed bins.
It is necessary the wd_step and ws_step passed in are at least as
large as the current wind direction and wind speed steps. If they are
not, the function will raise an error.
The function will return a new WindRose object with the aggregated
wind direction and wind speed bins. If inplace is set to True, the
current WindRose object will be updated with the aggregated bins.
Args:
wd_step: Step size for wind direction resampling (float, optional).
If None, the current step size will be used. Defaults to None.
ws_step: Step size for wind speed resampling (float, optional). If
None, the current step size will be used. Defaults to None.
inplace: Flag indicating whether to update the current WindRose
object when True or return a new WindRose object when False
(bool, optional). Defaults to False.
Returns:
WindRose: Aggregated wind rose based on the provided or default step
sizes. Only returned if inplace = False.
Notes:
- Returns a aggregated version of the wind rose using new `ws_step` and `wd_step`.
- Uses the bin weights feature in TimeSeries to aggregated the wind rose.
- If `ws_step` or `wd_step` is not specified, it uses the current values.
"""
# If ws_step is passed in, confirm is it at least as large as the current step
if ws_step is not None:
if len(self.wind_speeds) >= 2:
current_ws_step = self.wind_speeds[1] - self.wind_speeds[0]
if ws_step < current_ws_step:
raise ValueError(
"ws_step provided must be at least as large as the current ws_step "
f"({current_ws_step} m/s)"
)
# If wd_step is passed in, confirm is it at least as large as the current step
if wd_step is not None:
if len(self.wind_directions) >= 2:
current_wd_step = check_and_identify_step_size(wind_directions=self.wind_directions)
if wd_step < current_wd_step:
raise ValueError(
"wd_step provided must be at least as large as the current wd_step "
f"({current_wd_step} degrees)"
)
# If either ws_step or wd_step is None, set it to the current step
if ws_step is None:
if len(self.wind_speeds) >= 2:
ws_step = self.wind_speeds[1] - self.wind_speeds[0]
else: # wind rose will have only a single wind speed, and we assume a ws_step of 1
ws_step = 1.0
if wd_step is None:
if len(self.wind_directions) >= 2:
wd_step = check_and_identify_step_size(wind_directions=self.wind_directions)
else: # wind rose will have only a single wind direction, and we assume a wd_step of 1
wd_step = 1.0
# Pass the flat versions of each quantity to build a TimeSeries model
time_series = TimeSeries(
self.wd_flat,
self.ws_flat,
self.ti_table_flat,
self.value_table_flat,
self.heterogeneous_map,
)
# Now build a new wind rose using the new steps
aggregated_wind_rose = time_series.to_WindRose(
wd_step=wd_step, ws_step=ws_step, bin_weights=self.freq_table_flat
)
if inplace:
self.__init__(
aggregated_wind_rose.wind_directions,
aggregated_wind_rose.wind_speeds,
aggregated_wind_rose.ti_table,
aggregated_wind_rose.freq_table,
aggregated_wind_rose.value_table,
aggregated_wind_rose.compute_zero_freq_occurrence,
aggregated_wind_rose.heterogeneous_map,
)
else:
return aggregated_wind_rose
[docs]
def resample_by_interpolation(self, wd_step=None, ws_step=None, method="linear", inplace=False):
"""
Wrapper to upsample method for backwards compatibility
"""
return self.upsample(wd_step, ws_step, method, inplace)
[docs]
def upsample(self, wd_step=None, ws_step=None, method="linear", inplace=False):
"""
Resample the wind rose using interpolation for upsampling. The method can be either
'linear' or 'nearest'. If inplace is set to True, the current WindRose
object will be updated with the resampled bins.
Args:
wd_step: Step size for wind direction resampling (float, optional).
If None, the current step size will be used. Defaults to None.
ws_step: Step size for wind speed resampling (float, optional).
If None, the current step size will be used. Defaults to None.
method: Interpolation method to use (str, optional). Can be either
'linear' or 'nearest'. Defaults to "linear".
inplace: Flag indicating whether to update the current WindRose
object when True or return a new WindRose object when False
(bool, optional). Defaults to False.
Returns:
WindRose: Resampled wind rose based on the provided or default step
sizes. Only returned if inplace = False.
"""
if method == "linear":
interpolator = LinearNDInterpolator
elif method == "nearest":
interpolator = NearestNDInterpolator
else:
raise ValueError(
f"Unknown interpolation method: '{method}'. "
"Available methods are 'linear' and 'nearest'"
)
# First establish the current ws_step and wd_step
if len(self.wind_speeds) >= 2:
ws_step_current = self.wind_speeds[1] - self.wind_speeds[0]
else: # wind rose will have only a single wind speed, and we assume a ws_step of 1
ws_step_current = 1.0
if len(self.wind_directions) >= 2:
# Identify the current step size
wd_step_current = check_and_identify_step_size(wind_directions=self.wind_directions)
else: # wind rose will have only a single wind direction, and we assume a wd_step of 1
wd_step_current = 1.0
# If either ws_step or wd_step is None, set it to the current step
if ws_step is None:
ws_step = ws_step_current
if wd_step is None:
wd_step = wd_step_current
# Make sure upsampling is appropriate
if wd_step > wd_step_current:
raise ValueError(
f"Provided wd_step ({wd_step}) is larger than the current "
f" wind direction step size. ({wd_step_current} degrees)"
" Use the downsample method."
)
if ws_step > ws_step_current:
raise ValueError(
f"Provided ws_step ({ws_step}) is larger than "
f"the current wind speed step size. ({ws_step_current} m/s)"
" Use the downsample method."
)
# Get the current wind directions in adjacent from (ie 0, 2 358 -> -2, 0 ,2)
if len(self.wind_directions) >= 2:
current_wind_directions, adjacent_sort_index = make_wind_directions_adjacent(
self.wind_directions
)
else:
current_wind_directions = self.wind_directions
adjacent_sort_index = np.arange(len(current_wind_directions))
# Identify the covered range of wind directions
wd_range_min_current = np.min(current_wind_directions) - wd_step_current / 2.0
wd_range_max_current = np.max(current_wind_directions) + wd_step_current / 2.0
# Look for unlikely case where for example wind directions are 8, 28, ... 358
if wd_range_max_current > 360:
# TODO: Handle this case without an error
raise ValueError(
"Cannot upsample wind rose for case when wind directions are defined"
" such that 0 degrees is included by bins to the left of 0 degrees. "
)
# Identify the new minimum wind direction
wd_min_new = wd_range_min_current + wd_step / 2.0
wd_max_new = wd_range_max_current - wd_step / 2.0
new_wind_directions = np.arange(wd_min_new, wd_max_new + wd_step / 2.0, wd_step)
# Set up the new wind speeds
ws_range_min_current = np.min(self.wind_speeds) - ws_step_current / 2.0
ws_range_max_current = np.max(self.wind_speeds) + ws_step_current / 2.0
ws_min_new = ws_range_min_current + ws_step / 2.0
ws_max_new = ws_range_max_current - ws_step / 2.0
# Force the new ws_min to 0 if negative
if ws_min_new < 0:
ws_min_new = 0.0
new_wind_speeds = np.arange(ws_min_new, ws_max_new + ws_step / 2.0, ws_step)
# Set up for interpolation by copying the current values
# and making sure they are sorted according to the adjacent wind directions
wind_direction_column = current_wind_directions.copy()
wind_speed_column = self.wind_speeds.copy()
ti_matrix = self.ti_table.copy()[adjacent_sort_index, :]
freq_matrix = self.freq_table.copy()[adjacent_sort_index, :]
if self.value_table is not None:
value_matrix = self.value_table.copy()[adjacent_sort_index, :]
else:
value_matrix = None
# For padding wind directions, there are two cases to consider. In the first,
# say that the wind directions are 30, 40, 50. In this case it's important append
# 30 and 50 to 35 and 55 to ensure the interpolation covers the full range of data
# This is the case when wind directions doesn't cover the full range of possible
# degrees (0-360)
if np.abs((wd_range_min_current % 360.0) - (wd_range_max_current % 360.0)) > 1e-6:
wind_direction_column = np.concatenate((
np.array([wd_range_min_current]),
wind_direction_column,
np.array([wd_range_max_current])
))
ti_matrix = ti_matrix = np.vstack((ti_matrix[0, :], ti_matrix, ti_matrix[-1,:]))
freq_matrix = np.vstack((freq_matrix[0, :], freq_matrix, freq_matrix[-1,:]))
if self.value_table is not None:
value_matrix = np.vstack((value_matrix[0, :], value_matrix, value_matrix[-1,:]))
# In the alternative case, where the wind directions cover the full range
# ie, 0, 10, 20 30, ...350, then need to place 0 at 360 and 350 at -10
# to cover all interpolations
else:
# Pad wind direction column with min_wd + 360
wind_direction_column = np.concatenate(
(
[np.max(self.wind_directions) - 360.0],
wind_direction_column,
[np.min(self.wind_directions) + 360.0],
)
)
# Pad the remaining with the appropriate value
ti_matrix = ti_matrix = np.vstack((ti_matrix[-1, :], ti_matrix, ti_matrix[0, :]))
freq_matrix = np.vstack((freq_matrix[-1, :], freq_matrix, freq_matrix[0, :]))
if self.value_table is not None:
value_matrix = np.vstack((value_matrix[-1, :], value_matrix, value_matrix[0, :]))
# Pad out the wind speeds
wind_speed_column = np.concatenate(
(
np.array([ws_range_min_current]),
wind_speed_column,
np.array([ws_range_max_current])
)
)
ti_matrix = np.hstack(
(ti_matrix[:, 0].reshape((-1, 1)), ti_matrix, ti_matrix[:, -1].reshape((-1, 1)))
)
freq_matrix = np.hstack(
(freq_matrix[:, 0].reshape((-1, 1)), freq_matrix, freq_matrix[:, -1].reshape((-1, 1)))
)
if self.value_table is not None:
value_matrix = np.hstack(
(
value_matrix[:, 0].reshape((-1, 1)),
value_matrix,
value_matrix[:, -1].reshape((-1, 1))
)
)
# Grid wind directions and wind speeds to match the ti_matrix and freq_matrix when flattened
wd_grid, ws_grid = np.meshgrid(wind_direction_column, wind_speed_column, indexing="ij")
# Form wd_grid and ws_grid to a 2-column matrix
wd_ws_mat = np.array([wd_grid.flatten(), ws_grid.flatten()]).T
# Build the interpolator from wd_grid, ws_grid, to ti_matrix, freq_matrix and value_matrix
ti_interpolator = interpolator(wd_ws_mat, ti_matrix.flatten())
freq_interpolator = interpolator(wd_ws_mat, freq_matrix.flatten())
if self.value_table is not None:
value_interpolator = interpolator(wd_ws_mat, value_matrix.flatten())
# Grid the new wind directions and wind speeds
new_wd_grid, new_ws_grid = np.meshgrid(new_wind_directions, new_wind_speeds, indexing="ij")
new_wd_ws_mat = np.array([new_wd_grid.flatten(), new_ws_grid.flatten()]).T
# Create the new ti_matrix and freq_matrix
new_ti_matrix = ti_interpolator(new_wd_ws_mat).reshape(
(len(new_wind_directions), len(new_wind_speeds))
)
new_freq_matrix = freq_interpolator(new_wd_ws_mat).reshape(
(len(new_wind_directions), len(new_wind_speeds))
)
if self.value_table is not None:
new_value_matrix = value_interpolator(new_wd_ws_mat).reshape(
(len(new_wind_directions), len(new_wind_speeds))
)
else:
new_value_matrix = None
# Wrap new_wind_directions to 0-360
new_wind_directions = new_wind_directions % 360
# Finally sort new_wind_directions, and re-order new_ti_matrix, new_freq_matrix
# and new_value_matrix accordingly
sort_indices = np.argsort(new_wind_directions)
new_wind_directions = new_wind_directions[sort_indices]
new_ti_matrix = new_ti_matrix[sort_indices, :]
new_freq_matrix = new_freq_matrix[sort_indices, :]
if self.value_table is not None:
new_value_matrix = new_value_matrix[sort_indices, :]
# Create the resampled wind rose
resampled_wind_rose = WindRose(
new_wind_directions,
new_wind_speeds,
new_ti_matrix,
new_freq_matrix,
new_value_matrix,
self.compute_zero_freq_occurrence,
self.heterogeneous_map,
)
if inplace:
self.__init__(
resampled_wind_rose.wind_directions,
resampled_wind_rose.wind_speeds,
resampled_wind_rose.ti_table,
resampled_wind_rose.freq_table,
resampled_wind_rose.value_table,
resampled_wind_rose.compute_zero_freq_occurrence,
resampled_wind_rose.heterogeneous_map,
)
else:
return resampled_wind_rose
[docs]
def plot(
self,
ax=None,
color_map="viridis_r",
wd_step=None,
ws_step=None,
legend_kwargs={"label": "Wind speed [m/s]"},
):
"""
This method creates a wind rose plot showing the frequency of occurrence
of the specified wind direction and wind speed bins. If no axis is
provided, a new one is created.
**Note**: Based on code provided by Patrick Murphy from the University
of Colorado Boulder.
Args:
ax (:py:class:`matplotlib.pyplot.axes`, optional): The figure axes
on which the wind rose is plotted. Defaults to None.
color_map (str, optional): Colormap to use. Defaults to 'viridis_r'.
wd_step: Step size for wind direction (float, optional). If None,
the current step size will be used. Defaults to None.
ws_step: Step size for wind speed (float, optional).
the current step size will be used. Defaults to None.
legend_kwargs (dict, optional): Keyword arguments to be passed to
ax.legend(). Defaults to {"label": "Wind speed [m/s]"}.
Returns:
:py:class:`matplotlib.pyplot.axes`: A figure axes object containing
the plotted wind rose.
"""
# Get a aggregated (downsampled) wind_rose
wind_rose_aggregate = self.downsample(wd_step, ws_step, inplace=False)
wd_bins = wind_rose_aggregate.wind_directions
ws_bins = wind_rose_aggregate.wind_speeds
freq_table = wind_rose_aggregate.freq_table
# Set up figure
if ax is None:
_, ax = plt.subplots(subplot_kw={"polar": True})
# Get the wd_step
if wd_step is None:
if len(wd_bins) >= 2:
wd_step = wd_bins[1] - wd_bins[0]
else:
# This admittedly an odd edge case
wd_step = 360.0
# Get a color array
color_array = plt.get_cmap(color_map, len(ws_bins))
norm_ws = mpl.colors.Normalize(vmin=np.min(ws_bins), vmax=np.max(ws_bins))
sm_ws = mpl.cm.ScalarMappable(norm=norm_ws, cmap=color_array)
for wd_idx, wd in enumerate(wd_bins):
rects = []
freq_table_sub = freq_table[wd_idx, :].flatten()
for ws_idx, ws in reversed(list(enumerate(ws_bins))):
plot_val = freq_table_sub[: ws_idx + 1].sum()
rects.append(
ax.bar(
np.radians(wd),
plot_val,
width=0.9 * np.radians(wd_step),
color=color_array(ws_idx),
edgecolor="k",
)
)
# Configure the plot
ax.figure.colorbar(sm_ws, ax=ax, **legend_kwargs)
ax.figure.tight_layout()
ax.set_theta_direction(-1)
ax.set_theta_offset(np.pi / 2.0)
ax.set_theta_zero_location("N")
ax.set_xticks(np.arange(0, 2 * np.pi, np.pi / 4))
ax.set_xticklabels(["N", "NE", "E", "SE", "S", "SW", "W", "NW"])
return ax
[docs]
def assign_ti_using_wd_ws_function(self, func):
"""
Use the passed in function to assign new values to turbulence_intensities
Args:
func (function): Function which accepts wind_directions as its
first argument and wind_speeds as second argument and returns
turbulence_intensities
"""
self.ti_table = func(self.wd_grid, self.ws_grid)
self._build_gridded_and_flattened_version()
[docs]
def assign_ti_using_IEC_method(self, Iref=0.07, offset=3.8):
"""
Define TI as a function of wind speed by specifying an Iref and offset
value as in the normal turbulence model in the IEC 61400-1 standard
Args:
Iref (float): Reference turbulence level, defined as the expected
value of TI at 15 m/s. Default = 0.07. Note this value is
lower than the values of Iref for turbulence classes A, B, and
C in the IEC standard (0.16, 0.14, and 0.12, respectively), but
produces TI values more in line with those typically used in
FLORIS. When the default Iref and offset are used, the TI at
8 m/s is 8.6%.
offset (float): Offset value to equation. Default = 3.8, as defined
in the IEC standard to give the expected value of TI for
each wind speed.
"""
if (Iref < 0) or (Iref > 1):
raise ValueError("Iref must be >= 0 and <=1")
def iref_func(wind_directions, wind_speeds):
sigma_1 = Iref * (0.75 * wind_speeds + offset)
return sigma_1 / wind_speeds
self.assign_ti_using_wd_ws_function(iref_func)
[docs]
def plot_ti_over_ws(
self,
ax=None,
marker=".",
ls="None",
color="k",
):
"""
Scatter plot the turbulence_intensities against wind_speeds
Args:
ax (:py:class:`matplotlib.pyplot.axes`, optional): The figure axes
on which the turbulence intensity is plotted. Defaults to None.
marker (str, optional): Scatter plot marker style. Defaults to ".".
ls (str, optional): Scatter plot line style. Defaults to "None".
color (str, optional): Scatter plot color. Defaults to "k".
Returns:
:py:class:`matplotlib.pyplot.axes`: A figure axes object containing
the plotted turbulence intensities as a function of wind speed.
"""
# TODO: Plot mean and std. devs. of TI in each ws bin in addition to
# individual points
# Set up figure
if ax is None:
_, ax = plt.subplots()
ax.plot(self.ws_flat, self.ti_table_flat * 100, marker=marker, ls=ls, color=color)
ax.set_xlabel("Wind Speed (m/s)")
ax.set_ylabel("Turbulence Intensity (%)")
ax.grid(True)
[docs]
def assign_value_using_wd_ws_function(self, func, normalize=False):
"""
Use the passed in function to assign new values to the value table.
Args:
func (function): Function which accepts wind_directions as its
first argument and wind_speeds as second argument and returns
values.
normalize (bool, optional): If True, the value array will be
normalized by the mean value. Defaults to False.
"""
self.value_table = func(self.wd_grid, self.ws_grid)
if normalize:
self.value_table /= np.sum(self.freq_table * self.value_table)
self._build_gridded_and_flattened_version()
[docs]
def assign_value_piecewise_linear(
self,
value_zero_ws=1.425,
ws_knee=4.5,
slope_1=0.0,
slope_2=-0.135,
limit_to_zero=False,
normalize=False,
):
"""
Define value as a continuous piecewise linear function of wind speed
with two line segments. The default parameters yield a value function
that approximates the normalized mean electricity price vs. wind speed
curve for the SPP market in the U.S. for years 2018-2020 from figure 7
in Simley et al. "The value of wake steering wind farm flow control in
US energy markets," Wind Energy Science, 2024.
https://doi.org/10.5194/wes-9-219-2024. This default value function is
constant at low wind speeds, then linearly decreases above 4.5 m/s.
Args:
value_zero_ws (float, optional): The value when wind speed is zero.
Defaults to 1.425.
ws_knee (float, optional): The wind speed separating line segments
1 and 2. Default = 4.5 m/s.
slope_1 (float, optional): The slope of the first line segment
(unit of value per m/s). Defaults to zero.
slope_2 (float, optional): The slope of the second line segment
(unit of value per m/s). Defaults to -0.135.
limit_to_zero (bool, optional): If True, negative values will be
set to zero. Defaults to False.
normalize (bool, optional): If True, the value array will be
normalized by the mean value. Defaults to False.
"""
def piecewise_linear_value_func(wind_directions, wind_speeds):
value = np.zeros_like(wind_speeds, dtype=float)
value[wind_speeds < ws_knee] = (
slope_1 * wind_speeds[wind_speeds < ws_knee] + value_zero_ws
)
offset_2 = (slope_1 - slope_2) * ws_knee + value_zero_ws
value[wind_speeds >= ws_knee] = slope_2 * wind_speeds[wind_speeds >= ws_knee] + offset_2
if limit_to_zero:
value[value < 0] = 0.0
return value
self.assign_value_using_wd_ws_function(piecewise_linear_value_func, normalize)
[docs]
def plot_value_over_ws(
self,
ax=None,
marker=".",
ls="None",
color="k",
):
"""
Scatter plot the value of the energy generated against wind speed.
Args:
ax (:py:class:`matplotlib.pyplot.axes`, optional): The figure axes
on which the value is plotted. Defaults to None.
marker (str, optional): Scatter plot marker style. Defaults to ".".
ls (str, optional): Scatter plot line style. Defaults to "None".
color (str, optional): Scatter plot color. Defaults to "k".
Returns:
:py:class:`matplotlib.pyplot.axes`: A figure axes object containing
the plotted value as a function of wind speed.
"""
# TODO: Plot mean and std. devs. of value in each ws bin in addition to
# individual points
# Set up figure
if ax is None:
_, ax = plt.subplots()
ax.plot(self.ws_flat, self.value_table_flat, marker=marker, ls=ls, color=color)
ax.set_xlabel("Wind Speed (m/s)")
ax.set_ylabel("Value")
ax.grid(True)
[docs]
@staticmethod
def read_csv_long(
file_path: str,
ws_col: str = "wind_speeds",
wd_col: str = "wind_directions",
ti_col_or_value: str | float = "turbulence_intensities",
freq_col: str | None = None,
sep: str = ",",
) -> WindRose:
"""
Read a long-formatted CSV file into the wind rose object. By long, what is meant
is that the wind speed, wind direction combination is given for each row in the
CSV file. The wind speed, wind direction, are
given in separate columns, and the frequency of occurrence of each combination
is given in a separate column. The frequency column is optional, and if not
provided, uniform frequency of all bins is assumed.
The value of ti_col_or_value can be either a string or a float. If it is a string,
it is assumed to be the name of the column in the CSV file that contains the
turbulence intensity values. If it is a float, it is assumed to be a constant
turbulence intensity value for all wind speed and direction combinations.
Args:
file_path (str): Path to the CSV file.
ws_col (str): Name of the column in the CSV file that contains the wind speed
values. Defaults to 'wind_speeds'.
wd_col (str): Name of the column in the CSV file that contains the wind direction
values. Defaults to 'wind_directions'.
ti_col_or_value (str or float): Name of the column in the CSV file that contains
the turbulence intensity values, or a constant turbulence intensity value.
freq_col (str): Name of the column in the CSV file that contains the frequency
values. Defaults to None in which case constant frequency assumed.
sep (str): Delimiter to use. Defaults to ','.
Returns:
WindRose: Wind rose object created from the CSV file.
"""
# Read in the CSV file
try:
df = pd.read_csv(file_path, sep=sep)
except FileNotFoundError:
# If the file cannot be found, then attempt the level above
base_fn = Path(inspect.stack()[-1].filename).resolve().parent
file_path = base_fn / file_path
df = pd.read_csv(file_path, sep=sep)
# Check that ti_col_or_value is a string or a float
if not isinstance(ti_col_or_value, (str, float)):
raise TypeError("ti_col_or_value must be a string or a float")
# Check that the required columns are present
if ws_col not in df.columns:
raise ValueError(f"Column {ws_col} not found in CSV file")
if wd_col not in df.columns:
raise ValueError(f"Column {wd_col} not found in CSV file")
if ti_col_or_value not in df.columns and isinstance(ti_col_or_value, str):
raise ValueError(f"Column {ti_col_or_value} not found in CSV file")
if freq_col not in df.columns and freq_col is not None:
raise ValueError(f"Column {freq_col} not found in CSV file")
# Get the wind speed, wind direction, and turbulence intensity values
wind_directions = df[wd_col].values
wind_speeds = df[ws_col].values
if isinstance(ti_col_or_value, str):
turbulence_intensities = df[ti_col_or_value].values
else:
turbulence_intensities = ti_col_or_value * np.ones(len(wind_speeds))
if freq_col is not None:
freq_values = df[freq_col].values
else:
freq_values = np.ones(len(wind_speeds))
# Normalize freq_values
freq_values = freq_values / np.sum(freq_values)
# Get the unique values of wind directions and wind speeds
unique_wd = np.unique(wind_directions)
unique_ws = np.unique(wind_speeds)
# Get the step side for wind direction and wind speed
wd_step = unique_wd[1] - unique_wd[0]
ws_step = unique_ws[1] - unique_ws[0]
# Now use TimeSeries to create a wind rose
time_series = TimeSeries(wind_directions, wind_speeds, turbulence_intensities)
# Now build a new wind rose using the new steps
return time_series.to_WindRose(wd_step=wd_step, ws_step=ws_step, bin_weights=freq_values)
[docs]
class WindTIRose(WindDataBase):
"""
WindTIRose is similar to the WindRose class, but contains turbulence
intensity as an additional wind rose dimension instead of being defined
as a function of wind direction and wind speed. The class is used to drive
FLORIS and optimization operations in which the inflow is characterized by
the frequency of binned wind speed, wind direction, and turbulence intensity
values.
Args:
wind_directions: NumPy array of wind directions (NDArrayFloat).
wind_speeds: NumPy array of wind speeds (NDArrayFloat).
turbulence_intensities: NumPy array of turbulence intensities (NDArrayFloat).
freq_table: Frequency table for binned wind direction, wind speed, and
turbulence intensity values (NDArrayFloat, optional). Must have
dimension (n_wind_directions, n_wind_speeds, n_turbulence_intensities).
Defaults to None in which case uniform frequency of all bins is
assumed.
value_table: Value table for binned wind direction, wind
speed, and turbulence intensity values (NDArrayFloat, optional).
Must have dimension (n_wind_directions, n_wind_speeds,
n_turbulence_intensities). Defaults to None in which case uniform
values are assumed. Value can be used to weight power in each bin
to compute the total value of the energy produced.
compute_zero_freq_occurrence: Flag indicating whether to compute zero
frequency occurrences (bool, optional). Defaults to False.
heterogeneous_map (HeterogeneousMap, optional): A HeterogeneousMap object to define
background heterogeneous inflow condition as a function
of wind direction and wind speed. Alternatively, a dictionary can be
passed in to define a HeterogeneousMap object. Defaults to None.
heterogeneous_inflow_config_by_wd (dict, optional): A dictionary containing the following
which can be used to define a heterogeneous_map object (note this parameter is kept
for backwards compatibility and is not recommended for use):
* 'x': A 1D NumPy array (size num_points) of x-coordinates (meters).
* 'y': A 1D NumPy array (size num_points) of y-coordinates (meters).
* 'speed_multipliers': A 2D NumPy array (size num_wd (or num_ws) x num_points)
of speed multipliers. If neither wind_directions nor wind_speeds are
defined, then this should be a single row array
* 'wind_directions': A 1D NumPy array (size num_wd) of wind directions (degrees).
Optional.
* 'wind_speeds': A 1D NumPy array (size num_ws) of wind speeds (m/s). Optional.
Defaults to None.
"""
def __init__(
self,
wind_directions: NDArrayFloat,
wind_speeds: NDArrayFloat,
turbulence_intensities: NDArrayFloat,
freq_table: NDArrayFloat | None = None,
value_table: NDArrayFloat | None = None,
compute_zero_freq_occurrence: bool = False,
heterogeneous_map: HeterogeneousMap | dict | None = None,
heterogeneous_inflow_config_by_wd: dict | None = None,
):
if not isinstance(wind_directions, np.ndarray):
raise TypeError("wind_directions must be a NumPy array")
if not isinstance(wind_speeds, np.ndarray):
raise TypeError("wind_speeds must be a NumPy array")
if not isinstance(turbulence_intensities, np.ndarray):
raise TypeError("turbulence_intensities must be a NumPy array")
# Confirm that both wind_directions and wind_speeds
# and turbulence intensities are monotonically
# increasing and evenly spaced
if len(wind_directions) > 1:
# Check monotonically increasing
if not np.all(np.diff(wind_directions) > 0):
raise ValueError("wind_directions must be monotonically increasing")
# Check evenly spaced (Function will raise error if not)
check_and_identify_step_size(wind_directions=wind_directions)
if len(wind_speeds) > 1:
# Check monotonically increasing
if not np.all(np.diff(wind_speeds) > 0):
raise ValueError("wind_speeds must be monotonically increasing")
# Check evenly spaced
if not np.allclose(np.diff(wind_speeds), wind_speeds[1] - wind_speeds[0]):
raise ValueError("wind_speeds must be evenly spaced")
if len(turbulence_intensities) > 1:
# Check monotonically increasing
if not np.all(np.diff(turbulence_intensities) > 0):
raise ValueError("turbulence_intensities must be monotonically increasing")
# Check evenly spaced
if not np.allclose(
np.diff(turbulence_intensities),
turbulence_intensities[1] - turbulence_intensities[0],
):
raise ValueError("turbulence_intensities must be evenly spaced")
# Save the wind speeds and directions
self.wind_directions = wind_directions
self.wind_speeds = wind_speeds
self.turbulence_intensities = turbulence_intensities
# If freq_table is not None, confirm it has correct dimension,
# otherwise initialize to uniform probability
if freq_table is not None:
if not freq_table.shape[0] == len(wind_directions):
raise ValueError("freq_table first dimension must equal len(wind_directions)")
if not freq_table.shape[1] == len(wind_speeds):
raise ValueError("freq_table second dimension must equal len(wind_speeds)")
if not freq_table.shape[2] == len(turbulence_intensities):
raise ValueError(
"freq_table third dimension must equal len(turbulence_intensities)"
)
self.freq_table = freq_table
else:
self.freq_table = np.ones(
(len(wind_directions), len(wind_speeds), len(turbulence_intensities))
)
# Normalize freq table
self.freq_table = self.freq_table / np.sum(self.freq_table)
# If value_table is not None, confirm it has correct dimension,
# otherwise initialize to all ones
if value_table is not None:
if not value_table.shape[0] == len(wind_directions):
raise ValueError("value_table first dimension must equal len(wind_directions)")
if not value_table.shape[1] == len(wind_speeds):
raise ValueError("value_table second dimension must equal len(wind_speeds)")
if not value_table.shape[2] == len(turbulence_intensities):
raise ValueError(
"value_table third dimension must equal len(turbulence_intensities)"
)
self.value_table = value_table
# Save whether zero occurrence cases should be computed
self.compute_zero_freq_occurrence = compute_zero_freq_occurrence
# Check that heterogeneous_map and heterogeneous_inflow_config_by_wd are not both defined
if heterogeneous_map is not None and heterogeneous_inflow_config_by_wd is not None:
raise ValueError(
"Only one of heterogeneous_map and heterogeneous_inflow_config_by_wd can be"
+ " defined."
)
# If heterogeneous_inflow_config_by_wd is not None, then create a HeterogeneousMap object
# using the dictionary
if heterogeneous_inflow_config_by_wd is not None:
# TODO: In future, add deprectation warning for this parameter here
self.heterogeneous_map = HeterogeneousMap(**heterogeneous_inflow_config_by_wd)
# Else if heterogeneous_map is not None
elif heterogeneous_map is not None:
# If heterogeneous_map is a dictionary, then create a HeterogeneousMap object
if isinstance(heterogeneous_map, dict):
self.heterogeneous_map = HeterogeneousMap(**heterogeneous_map)
# Else if heterogeneous_map is a HeterogeneousMap object, then save it
elif isinstance(heterogeneous_map, HeterogeneousMap):
self.heterogeneous_map = heterogeneous_map
# Else raise an error
else:
raise ValueError(
"heterogeneous_map must be a HeterogeneousMap object or a dictionary."
)
# Else if neither heterogeneous_map nor heterogeneous_inflow_config_by_wd are defined,
# then set heterogeneous_map to None
else:
self.heterogeneous_map = None
# Build the gridded and flatten versions
self._build_gridded_and_flattened_version()
def _build_gridded_and_flattened_version(self):
"""
Given the wind direction, wind speed, and turbulence intensity array,
build the gridded versions covering all combinations, and then flatten
versions which put all combinations into 1D array
"""
# Gridded wind speed and direction
self.wd_grid, self.ws_grid, self.ti_grid = np.meshgrid(
self.wind_directions, self.wind_speeds, self.turbulence_intensities, indexing="ij"
)
# Flat wind direction, wind speed, and turbulence intensity
self.wd_flat = self.wd_grid.flatten()
self.ws_flat = self.ws_grid.flatten()
self.ti_flat = self.ti_grid.flatten()
# Flat frequency table
self.freq_table_flat = self.freq_table.flatten()
# value table
if self.value_table is not None:
self.value_table_flat = self.value_table.flatten()
else:
self.value_table_flat = None
# Set mask to non-zero frequency cases depending on compute_zero_freq_occurrence
if self.compute_zero_freq_occurrence:
# If computing zero freq occurrences, then this is all True
self.non_zero_freq_mask = [True for i in range(len(self.freq_table_flat))]
else:
self.non_zero_freq_mask = self.freq_table_flat > 0.0
# N_findex should only be the calculated cases
self.n_findex = np.sum(self.non_zero_freq_mask)
[docs]
def unpack(self):
"""
Unpack the flattened versions of the matrices and return the values
accounting for the non_zero_freq_mask
"""
# The unpacked versions start as the flat version of each
wind_directions_unpack = self.wd_flat.copy()
wind_speeds_unpack = self.ws_flat.copy()
turbulence_intensities_unpack = self.ti_flat.copy()
freq_table_unpack = self.freq_table_flat.copy()
# Now mask thes values according to self.non_zero_freq_mask
wind_directions_unpack = wind_directions_unpack[self.non_zero_freq_mask]
wind_speeds_unpack = wind_speeds_unpack[self.non_zero_freq_mask]
turbulence_intensities_unpack = turbulence_intensities_unpack[self.non_zero_freq_mask]
freq_table_unpack = freq_table_unpack[self.non_zero_freq_mask]
# Now get unpacked value table
if self.value_table_flat is not None:
value_table_unpack = self.value_table_flat[self.non_zero_freq_mask].copy()
else:
value_table_unpack = None
# If heterogeneous_map is not None, then get the heterogeneous_inflow_config
if self.heterogeneous_map is not None:
heterogeneous_inflow_config = self.heterogeneous_map.get_heterogeneous_inflow_config(
wind_directions=wind_directions_unpack, wind_speeds=wind_speeds_unpack
)
else:
heterogeneous_inflow_config = None
return (
wind_directions_unpack,
wind_speeds_unpack,
turbulence_intensities_unpack,
freq_table_unpack,
value_table_unpack,
heterogeneous_inflow_config,
)
[docs]
def aggregate(self, wd_step=None, ws_step=None, ti_step=None, inplace=False):
"""
Wrapper for downsample method for backwards compatibility
"""
return self.downsample(wd_step, ws_step, ti_step, inplace)
[docs]
def downsample(self, wd_step=None, ws_step=None, ti_step=None, inplace=False):
"""
Aggregates the wind TI rose into fewer wind direction, wind speed and TI bins.
It is necessary the wd_step and ws_step ti_step passed in are at least as
large as the current wind direction and wind speed steps. If they are
not, the function will raise an error.
The function will return a new WindTIRose object with the aggregated
wind direction, wind speed and TI bins. If inplace is set to True, the
current WindTIRose object will be updated with the aggregated bins.
Args:
wd_step: Step size for wind direction resampling (float, optional).
ws_step: Step size for wind speed resampling (float, optional).
ti_step: Step size for turbulence intensity resampling (float, optional).
inplace: Flag indicating whether to update the current WindTIRose.
Defaults to False.
Returns:
WindTIRose: Aggregated wind TI rose based on the provided or default step sizes.
Notes:
- Returns an aggregated version of the wind TI rose using new `ws_step`,
`wd_step`, and `ti_step`.
- Uses the bin weights feature in TimeSeries to aggregate the wind rose.
- If `ws_step`, `wd_step`, or `ti_step` are not specified, it uses
the current values.
"""
# If ws_step is passed in, confirm is it at least as large as the current step
if ws_step is not None:
if len(self.wind_speeds) >= 2:
current_ws_step = self.wind_speeds[1] - self.wind_speeds[0]
if ws_step < current_ws_step:
raise ValueError(
"ws_step provided must be at least as large as the current ws_step "
f"({current_ws_step} m/s)"
)
# If wd_step is passed in, confirm is it at least as large as the current step
if wd_step is not None:
if len(self.wind_directions) >= 2:
current_wd_step = check_and_identify_step_size(wind_directions=self.wind_directions)
if wd_step < current_wd_step:
raise ValueError(
"wd_step provided must be at least as large as the current wd_step "
f"({current_wd_step} degrees)"
)
# If ti_step is passed in, confirm is it at least as large as the current step
if ti_step is not None:
if len(self.turbulence_intensities) >= 2:
current_ti_step = self.turbulence_intensities[1] - self.turbulence_intensities[0]
if ti_step < current_ti_step:
raise ValueError(
"ti_step provided must be at least as large as the current ti_step "
f"({current_ti_step})"
)
# If ws_step, wd_step or ti_step is none, set it to the current step
if ws_step is None:
if len(self.wind_speeds) >= 2:
ws_step = self.wind_speeds[1] - self.wind_speeds[0]
else: # wind rose will have only a single wind speed, and we assume a ws_step of 1
ws_step = 1.0
if wd_step is None:
if len(self.wind_directions) >= 2:
wd_step = check_and_identify_step_size(wind_directions=self.wind_directions)
else: # wind rose will have only a single wind direction, and we assume a wd_step of 1
wd_step = 1.0
if ti_step is None:
if len(self.turbulence_intensities) >= 2:
ti_step = self.turbulence_intensities[1] - self.turbulence_intensities[0]
else: # wind rose will have only a single TI, and we assume a ti_step of 1
ti_step = 1.0
# Pass the flat versions of each quantity to build a TimeSeries model
time_series = TimeSeries(
self.wd_flat,
self.ws_flat,
self.ti_flat,
self.value_table_flat,
self.heterogeneous_map,
)
# Now build a new wind rose using the new steps
aggregated_wind_rose = time_series.to_WindTIRose(
wd_step=wd_step, ws_step=ws_step, ti_step=ti_step, bin_weights=self.freq_table_flat
)
if inplace:
self.__init__(
aggregated_wind_rose.wind_directions,
aggregated_wind_rose.wind_speeds,
aggregated_wind_rose.turbulence_intensities,
aggregated_wind_rose.freq_table,
aggregated_wind_rose.value_table,
aggregated_wind_rose.compute_zero_freq_occurrence,
aggregated_wind_rose.heterogeneous_map,
)
else:
return aggregated_wind_rose
[docs]
def resample_by_interpolation(self, wd_step=None, ws_step=None, method="linear", inplace=False):
"""
Wrapper to upsample method for backwards compatibility
"""
return self.upsample(wd_step, ws_step, method, inplace)
[docs]
def upsample(self, wd_step=None, ws_step=None, ti_step=None, method="linear", inplace=False):
"""
Resample the wind TI rose using interpolation. The method can be either
'linear' or 'nearest'. If inplace is set to True, the current WindTIRose
object will be updated with the resampled bins.
Args:
wd_step: Step size for wind direction resampling (float, optional).
If None, the current step size will be used. Defaults to None.
ws_step: Step size for wind speed resampling (float, optional).
If None, the current step size will be used. Defaults to None.
ti_step: Step size for turbulence intensity resampling (float, optional).
If None, the current step size will be used. Defaults to None.
method: Interpolation method to use (str, optional). Can be either
'linear' or 'nearest'. Defaults to "linear".
inplace: Flag indicating whether to update the current WindRose
object when True or return a new WindRose object when False
(bool, optional). Defaults to False.
Returns:
WindRose: Resampled wind rose based on the provided or default step
sizes. Only returned if inplace = False.
"""
if method == "linear":
interpolator = LinearNDInterpolator
elif method == "nearest":
interpolator = NearestNDInterpolator
else:
raise ValueError(
f"Unknown interpolation method: '{method}'. "
"Available methods are 'linear' and 'nearest'"
)
# First establish the current ws_step and wd_step and ti_step
if len(self.wind_speeds) >= 2:
ws_step_current = self.wind_speeds[1] - self.wind_speeds[0]
else: # wind rose will have only a single wind speed, and we assume a ws_step of 1
ws_step_current = 1.0
if len(self.wind_directions) >= 2:
wd_step_current = check_and_identify_step_size(wind_directions=self.wind_directions)
else: # wind rose will have only a single wind direction, and we assume a wd_step of 1
wd_step_current = 1.0
if len(self.turbulence_intensities) >= 2:
ti_step_current = self.turbulence_intensities[1] - self.turbulence_intensities[0]
else: # wind rose will have only a single turbulence intensity,
# and we assume a ti_step of 1
ti_step_current = 1.0
# If either ws_step or wd_step or ti_step is None, set it to the current step
if ws_step is None:
ws_step = ws_step_current
if wd_step is None:
wd_step = wd_step_current
if ti_step is None:
ti_step = ti_step_current
# Make sure upsampling is appropriate
if wd_step > wd_step_current:
raise ValueError(
f"Provided wd_step ({wd_step}) is larger than the current "
f" wind direction step size. ({wd_step_current} degrees)"
" Use the downsample method."
)
if ws_step > ws_step_current:
raise ValueError(
f"Provided ws_step ({ws_step}) is larger than "
f"the current wind speed step size. ({ws_step_current} m/s)"
" Use the downsample method."
)
if ti_step > ti_step_current:
raise ValueError(
f"Provided ti_step ({ti_step}) is larger than "
f"the current turbulence intensity step size. ({ti_step_current})"
" Use the downsample method."
)
# Get the current wind directions in adjacent from (ie 0, 2 358 -> -2, 0 ,2)
if len(self.wind_directions) >= 2:
current_wind_directions, adjacent_sort_index = make_wind_directions_adjacent(
self.wind_directions
)
else:
current_wind_directions = self.wind_directions
adjacent_sort_index = np.arange(len(current_wind_directions))
# Identify the covered range of wind directions
wd_range_min_current = np.min(current_wind_directions) - wd_step_current / 2.0
wd_range_max_current = np.max(current_wind_directions) + wd_step_current / 2.0
# Look for unlikely case where for example wind directions are 8, 28, ... 358
if wd_range_max_current > 360:
# TODO: Handle this case without an error
raise ValueError(
"Cannot upsample wind rose for case when wind directions are defined"
" such that 0 degrees is included by bins to the left of 0 degrees. "
)
# Identify the new minimum wind direction
wd_min_new = wd_range_min_current + wd_step / 2.0
wd_max_new = wd_range_max_current - wd_step / 2.0
new_wind_directions = np.arange(wd_min_new, wd_max_new + wd_step / 2.0, wd_step)
# Set up the new wind speeds
ws_range_min_current = np.min(self.wind_speeds) - ws_step_current / 2.0
ws_range_max_current = np.max(self.wind_speeds) + ws_step_current / 2.0
ws_min_new = ws_range_min_current + ws_step / 2.0
ws_max_new = ws_range_max_current - ws_step / 2.0
# Force the new ws_min to 0 if negative
if ws_min_new < 0:
ws_min_new = 0.0
new_wind_speeds = np.arange(ws_min_new, ws_max_new + ws_step / 2.0, ws_step)
# Set up the new turbulence intensities
ti_range_min_current = np.min(self.turbulence_intensities) - ti_step_current / 2.0
ti_range_max_current = np.max(self.turbulence_intensities) + ti_step_current / 2.0
ti_min_new = ti_range_min_current + ti_step / 2.0
ti_max_new = ti_range_max_current - ti_step / 2.0
# Force the new ti_min to 0 if negative
if ti_min_new < 0:
ti_min_new = 0.0
new_turbulence_intensities = np.arange(ti_min_new, ti_max_new + ti_step / 2.0, ti_step)
# Set up for interpolation by copying the current values
# and making sure they are sorted according to the adjacent wind directions
wind_direction_column = current_wind_directions.copy()
wind_speed_column = self.wind_speeds.copy()
turbulence_intensity_column = self.turbulence_intensities.copy()
freq_matrix = self.freq_table.copy()[adjacent_sort_index, :, :]
if self.value_table is not None:
value_matrix = self.value_table.copy()[adjacent_sort_index, :, :]
else:
value_matrix = None
# For padding wind directions, there are two cases to consider. In the first,
# say that the wind directions are 30, 40, 50. In this case it's important append
# 30 and 50 to 35 and 55 to ensure the interpolation covers the full range of data
# This is the case when wind directions doesn't cover the full range of possible
# degrees (0-360)
if np.abs((wd_range_min_current % 360.0) - (wd_range_max_current % 360.0)) > 1e-6:
wind_direction_column = np.concatenate(
(
np.array([wd_range_min_current]),
wind_direction_column,
np.array([wd_range_max_current])
)
)
freq_matrix = np.concatenate(
(freq_matrix[0, :, :][None, :, :], freq_matrix, freq_matrix[-1, :, :][None, :, :]),
axis=0
)
if self.value_table is not None:
value_matrix = np.concatenate(
(
value_matrix[0, :, :][None, :, :],
value_matrix,
value_matrix[-1, :, :][None, :, :]
),
axis=0
)
# In the alternative case, where the wind directions cover the full range
# ie, 0, 10, 20 30, ...350, then need to place 0 at 360 and 350 at -10
# to cover all interpolations
else:
# Pad wind direction column with min_wd + 360
wind_direction_column = np.concatenate(
(
[np.max(self.wind_directions) - 360.0],
wind_direction_column,
[np.min(self.wind_directions) + 360.0],
)
)
# Pad the remaining with the appropriate value
freq_matrix = np.vstack(
(freq_matrix[-1, :, :][None, :, :], freq_matrix, freq_matrix[0, :, :][None, :, :])
)
if self.value_table is not None:
value_matrix = np.vstack(
(
value_matrix[-1, :, :][None, :, :],
value_matrix,
value_matrix[0, :, :][None, :, :],
)
)
# Pad out the wind speeds
wind_speed_column = np.concatenate(
(
np.array([ws_range_min_current]),
wind_speed_column,
np.array([ws_range_max_current])
)
)
freq_matrix = np.concatenate(
(freq_matrix[:, 0, :][:, None, :], freq_matrix, freq_matrix[:, -1, :][:, None, :]),
axis=1
)
if self.value_table is not None:
value_matrix = np.concatenate(
(
value_matrix[:, 0, :][:, None, :],
value_matrix,
value_matrix[:, -1, :][:, None, :]
),
axis=1
)
# Pad out the turbulence intensities
turbulence_intensity_column = np.concatenate(
(
np.array([ti_range_min_current]),
turbulence_intensity_column,
np.array([ti_range_max_current])
)
)
freq_matrix = np.concatenate(
(freq_matrix[:, :, 0][:, :, None], freq_matrix, freq_matrix[:, :, -1][:, :, None]),
axis=2
)
if self.value_table is not None:
value_matrix = np.concatenate(
(
value_matrix[:, :, 0][:, :, None],
value_matrix,
value_matrix[:, :, -1][:, :, None]
),
axis=2
)
# Grid wind directions, wind speeds and turbulence intensities to match the
# freq_matrix when flattened
wd_grid, ws_grid, ti_grid = np.meshgrid(
wind_direction_column, wind_speed_column, turbulence_intensity_column, indexing="ij"
)
# Form wd_grid and ws_grid to a 2-column matrix
wd_ws_ti_mat = np.array([wd_grid.flatten(), ws_grid.flatten(), ti_grid.flatten()]).T
# Build the interpolator from wd_grid, ws_grid, to ti_matrix, freq_matrix and value_matrix
freq_interpolator = interpolator(wd_ws_ti_mat, freq_matrix.flatten())
if self.value_table is not None:
value_interpolator = interpolator(wd_ws_ti_mat, value_matrix.flatten())
# Grid the new wind directions and wind speeds
new_wd_grid, new_ws_grid, new_ti_grid = np.meshgrid(
new_wind_directions, new_wind_speeds, new_turbulence_intensities, indexing="ij"
)
new_wd_ws_ti_mat = np.array(
[new_wd_grid.flatten(), new_ws_grid.flatten(), new_ti_grid.flatten()]
).T
# Create the new freq_matrix and value_matrix
new_freq_matrix = freq_interpolator(new_wd_ws_ti_mat).reshape(
(len(new_wind_directions), len(new_wind_speeds), len(new_turbulence_intensities))
)
if self.value_table is not None:
new_value_matrix = value_interpolator(new_wd_ws_ti_mat).reshape(
(len(new_wind_directions), len(new_wind_speeds), len(new_turbulence_intensities))
)
else:
new_value_matrix = None
# Wrap new_wind_directions to 0-360
new_wind_directions = new_wind_directions % 360
# Finally sort new_wind_directions, and re-order new_ti_matrix, new_freq_matrix
# and new_value_matrix accordingly
sort_indices = np.argsort(new_wind_directions)
new_wind_directions = new_wind_directions[sort_indices]
new_freq_matrix = new_freq_matrix[sort_indices, :, :]
if self.value_table is not None:
new_value_matrix = new_value_matrix[sort_indices, :, :]
# Create the resampled wind rose
resampled_wind_rose = WindTIRose(
new_wind_directions,
new_wind_speeds,
new_turbulence_intensities,
new_freq_matrix,
new_value_matrix,
self.compute_zero_freq_occurrence,
self.heterogeneous_map,
)
if inplace:
self.__init__(
resampled_wind_rose.wind_directions,
resampled_wind_rose.wind_speeds,
resampled_wind_rose.turbulence_intensities,
resampled_wind_rose.freq_table,
resampled_wind_rose.value_table,
resampled_wind_rose.compute_zero_freq_occurrence,
resampled_wind_rose.heterogeneous_map,
)
else:
return resampled_wind_rose
[docs]
def plot(
self,
ax=None,
wind_rose_var="ws",
color_map="viridis_r",
wd_step=15.0,
wind_rose_var_step=None,
legend_kwargs={"label": "Wind speed [m/s]"},
):
"""
This method creates a wind rose plot showing the frequency of occurrence
of either the specified wind direction and wind speed bins or wind
direction and turbulence intensity bins. If no axis is provided, a new
one is created.
**Note**: Based on code provided by Patrick Murphy from the University
of Colorado Boulder.
Args:
ax (:py:class:`matplotlib.pyplot.axes`, optional): The figure axes
on which the wind rose is plotted. Defaults to None.
wind_rose_var (str, optional): The variable to display in the wind
rose plot in addition to wind direction. If
wind_rose_var = "ws", wind speed frequencies will be plotted.
If wind_rose_var = "ti", turbulence intensity frequencies will
be plotted. Defaults to "ws".
color_map (str, optional): Colormap to use. Defaults to 'viridis_r'.
wd_step (float, optional): Step size for wind direction. Defaults
to 15 degrees.
wind_rose_var_step (float, optional): Step size for other wind rose
variable. Defaults to None. If unspecified, a value of 5 m/s
will be used if wind_rose_var = "ws", and a value of 4% will be
used if wind_rose_var = "ti".
legend_kwargs (dict, optional): Keyword arguments to be passed to
ax.legend(). Defaults to {"label": "Wind speed [m/s]"}.
Returns:
:py:class:`matplotlib.pyplot.axes`: A figure axes object containing
the plotted wind rose.
"""
if wind_rose_var not in {"ws", "ti"}:
raise ValueError(
'wind_rose_var must be either "ws" or "ti" for wind speed or turbulence intensity.'
)
# Get a aggregated wind_rose
if wind_rose_var == "ws":
if wind_rose_var_step is None:
wind_rose_var_step = 5.0
wind_rose_aggregated = self.downsample(wd_step, ws_step=wind_rose_var_step)
var_bins = wind_rose_aggregated.wind_speeds
freq_table = wind_rose_aggregated.freq_table.sum(2) # sum along TI dimension
else: # wind_rose_var == "ti"
if wind_rose_var_step is None:
wind_rose_var_step = 0.04
wind_rose_aggregated = self.downsample(wd_step, ti_step=wind_rose_var_step)
var_bins = wind_rose_aggregated.turbulence_intensities
freq_table = wind_rose_aggregated.freq_table.sum(1) # sum along wind speed dimension
wd_bins = wind_rose_aggregated.wind_directions
# Set up figure
if ax is None:
_, ax = plt.subplots(subplot_kw={"polar": True})
# Get a color array
color_array = plt.get_cmap(color_map, len(var_bins))
norm_wv = mpl.colors.Normalize(vmin=np.min(var_bins), vmax=np.max(var_bins))
sm_wv = mpl.cm.ScalarMappable(norm=norm_wv, cmap=color_array)
for wd_idx, wd in enumerate(wd_bins):
rects = []
freq_table_sub = freq_table[wd_idx, :].flatten()
for var_idx, ws in reversed(list(enumerate(var_bins))):
plot_val = freq_table_sub[: var_idx + 1].sum()
rects.append(
ax.bar(
np.radians(wd),
plot_val,
width=0.9 * np.radians(wd_step),
color=color_array(var_idx),
edgecolor="k",
)
)
# Configure the plot
ax.figure.colorbar(sm_wv, ax=ax, **legend_kwargs)
ax.figure.tight_layout()
ax.set_theta_direction(-1)
ax.set_theta_offset(np.pi / 2.0)
ax.set_theta_zero_location("N")
ax.set_xticks(np.arange(0, 2 * np.pi, np.pi / 4))
ax.set_xticklabels(["N", "NE", "E", "SE", "S", "SW", "W", "NW"])
return ax
[docs]
def plot_ti_over_ws(
self,
ax=None,
marker=".",
ls="-",
color="k",
):
"""
Plot the mean turbulence intensity against wind speed.
Args:
ax (:py:class:`matplotlib.pyplot.axes`, optional): The figure axes
on which the mean turbulence intensity is plotted. Defaults to None.
marker (str, optional): Scatter plot marker style. Defaults to ".".
ls (str, optional): Scatter plot line style. Defaults to "None".
color (str, optional): Scatter plot color. Defaults to "k".
Returns:
:py:class:`matplotlib.pyplot.axes`: A figure axes object containing
the plotted mean turbulence intensities as a function of wind speed.
"""
# TODO: Plot individual points and std. devs. of TI in addition to mean
# values
# Set up figure
if ax is None:
_, ax = plt.subplots()
# get mean TI for each wind speed by averaging along wind direction and
# TI dimensions
mean_ti_values = (self.ti_grid * self.freq_table).sum((0, 2)) / self.freq_table.sum((0, 2))
ax.plot(self.wind_speeds, mean_ti_values * 100, marker=marker, ls=ls, color=color)
ax.set_xlabel("Wind Speed (m/s)")
ax.set_ylabel("Mean Turbulence Intensity (%)")
ax.grid(True)
[docs]
def assign_value_using_wd_ws_ti_function(self, func, normalize=False):
"""
Use the passed in function to assign new values to the value table.
Args:
func (function): Function which accepts wind_directions as its
first argument, wind_speeds as its second argument, and
turbulence_intensities as its third argument and returns
values.
normalize (bool, optional): If True, the value array will be
normalized by the mean value. Defaults to False.
"""
self.value_table = func(self.wd_grid, self.ws_grid, self.ti_grid)
if normalize:
self.value_table /= np.sum(self.freq_table * self.value_table)
self._build_gridded_and_flattened_version()
[docs]
def assign_value_piecewise_linear(
self,
value_zero_ws=1.425,
ws_knee=4.5,
slope_1=0.0,
slope_2=-0.135,
limit_to_zero=False,
normalize=False,
):
"""
Define value as a continuous piecewise linear function of wind speed
with two line segments. The default parameters yield a value function
that approximates the normalized mean electricity price vs. wind speed
curve for the SPP market in the U.S. for years 2018-2020 from figure 7
in Simley et al. "The value of wake steering wind farm flow control in
US energy markets," Wind Energy Science, 2024.
https://doi.org/10.5194/wes-9-219-2024. This default value function is
constant at low wind speeds, then linearly decreases above 4.5 m/s.
Args:
value_zero_ws (float, optional): The value when wind speed is zero.
Defaults to 1.425.
ws_knee (float, optional): The wind speed separating line segments
1 and 2. Default = 4.5 m/s.
slope_1 (float, optional): The slope of the first line segment
(unit of value per m/s). Defaults to zero.
slope_2 (float, optional): The slope of the second line segment
(unit of value per m/s). Defaults to -0.135.
limit_to_zero (bool, optional): If True, negative values will be
set to zero. Defaults to False.
normalize (bool, optional): If True, the value array will be
normalized by the mean value. Defaults to False.
"""
def piecewise_linear_value_func(wind_directions, wind_speeds, turbulence_intensities):
value = np.zeros_like(wind_speeds, dtype=float)
value[wind_speeds < ws_knee] = (
slope_1 * wind_speeds[wind_speeds < ws_knee] + value_zero_ws
)
offset_2 = (slope_1 - slope_2) * ws_knee + value_zero_ws
value[wind_speeds >= ws_knee] = slope_2 * wind_speeds[wind_speeds >= ws_knee] + offset_2
if limit_to_zero:
value[value < 0] = 0.0
return value
self.assign_value_using_wd_ws_ti_function(piecewise_linear_value_func, normalize)
[docs]
def plot_value_over_ws(
self,
ax=None,
marker=".",
ls="None",
color="k",
):
"""
Scatter plot the value of the energy generated against wind speed.
Args:
ax (:py:class:`matplotlib.pyplot.axes`, optional): The figure axes
on which the value is plotted. Defaults to None.
marker (str, optional): Scatter plot marker style. Defaults to ".".
ls (str, optional): Scatter plot line style. Defaults to "None".
color (str, optional): Scatter plot color. Defaults to "k".
Returns:
:py:class:`matplotlib.pyplot.axes`: A figure axes object containing
the plotted value as a function of wind speed.
"""
# TODO: Plot mean and std. devs. of value in each ws bin in addition to
# individual points
# Set up figure
if ax is None:
_, ax = plt.subplots()
ax.plot(self.ws_flat, self.value_table_flat, marker=marker, ls=ls, color=color)
ax.set_xlabel("Wind Speed (m/s)")
ax.set_ylabel("Value")
ax.grid(True)
[docs]
@staticmethod
def read_csv_long(
file_path: str,
ws_col: str = "wind_speeds",
wd_col: str = "wind_directions",
ti_col: str = "turbulence_intensities",
freq_col: str | None = None,
sep: str = ",",
) -> WindTIRose:
"""
Read a long-formatted CSV file into the WindTIRose object. By long, what is meant
is that the wind speed, wind direction and turbulence intensities
combination is given for each row in the
CSV file. The wind speed, wind direction, and turbulence intensity are
given in separate columns, and the frequency of occurrence of each combination
is given in a separate column. The frequency column is optional, and if not
provided, uniform frequency of all bins is assumed.
Args:
file_path (str): Path to the CSV file.
ws_col (str): Name of the column in the CSV file that contains the wind speed
values. Defaults to 'wind_speeds'.
wd_col (str): Name of the column in the CSV file that contains the wind direction
values. Defaults to 'wind_directions'.
ti_col (str): Name of the column in the CSV file that contains
the turbulence intensity values.
freq_col (str): Name of the column in the CSV file that contains the frequency
values. Defaults to None in which case constant frequency assumed.
sep (str): Delimiter to use. Defaults to ','.
Returns:
WindRose: Wind rose object created from the CSV file.
"""
# Read in the CSV file
df = pd.read_csv(file_path, sep=sep)
# Check that the required columns are present
if ws_col not in df.columns:
raise ValueError(f"Column {ws_col} not found in CSV file")
if wd_col not in df.columns:
raise ValueError(f"Column {wd_col} not found in CSV file")
if ti_col not in df.columns:
raise ValueError(f"Column {ti_col} not found in CSV file")
if freq_col not in df.columns and freq_col is not None:
raise ValueError(f"Column {freq_col} not found in CSV file")
# Get the wind speed, wind direction, and turbulence intensity values
wind_directions = df[wd_col].values
wind_speeds = df[ws_col].values
turbulence_intensities = df[ti_col].values
if freq_col is not None:
freq_values = df[freq_col].values
else:
freq_values = np.ones(len(wind_speeds))
# Normalize freq_values
freq_values = freq_values / np.sum(freq_values)
# Get the unique values of wind directions and wind speeds
unique_wd = np.unique(wind_directions)
unique_ws = np.unique(wind_speeds)
unique_ti = np.unique(turbulence_intensities)
# Get the step side for wind direction and wind speed
wd_step = unique_wd[1] - unique_wd[0]
ws_step = unique_ws[1] - unique_ws[0]
ti_step = unique_ti[1] - unique_ti[0]
# Now use TimeSeries to create a wind rose
time_series = TimeSeries(wind_directions, wind_speeds, turbulence_intensities)
# Now build a new wind rose using the new steps
return time_series.to_WindTIRose(
wd_step=wd_step, ws_step=ws_step, ti_step=ti_step, bin_weights=freq_values
)
[docs]
class TimeSeries(WindDataBase):
"""
The TimeSeries class is used to drive FLORIS and optimization operations in
which the inflow is by a sequence of wind direction, wind speed and
turbulence intensity values. Each input of wind direction, wind speed, and
turbulence intensity can be assigned as an array of values or a single value.
At least one of wind_directions, wind_speeds, or turbulence_intensities must
be an array. If arrays are provided, they must be the same length as the
other arrays or the single values. If single values are provided, then an
array of the same length as the other arrays will be created with the single
value.
Args:
wind_directions (float, NDArrayFloat): Wind direction. Can be a single
value or an array of values.
wind_speeds (float, NDArrayFloat): Wind speed. Can be a single value or
an array of values.
turbulence_intensities (float, NDArrayFloat): Turbulence intensity. Can be
a single value or an array of values.
values (NDArrayFloat, optional): Values associated with each wind
direction, wind speed, and turbulence intensity. Defaults to None.
heterogeneous_map (HeterogeneousMap, optional): A HeterogeneousMap object to define
background heterogeneous inflow condition as a function
of wind direction and wind speed. Alternatively, a dictionary can be
passed in to define a HeterogeneousMap object. Defaults to None.
heterogeneous_inflow_config_by_wd (dict, optional): A dictionary containing the following
which can be used to define a heterogeneous_map object (note this parameter is kept
for backwards compatibility and is not recommended for use):
* 'x': A 1D NumPy array (size num_points) of x-coordinates (meters).
* 'y': A 1D NumPy array (size num_points) of y-coordinates (meters).
* 'speed_multipliers': A 2D NumPy array (size num_wd (or num_ws) x num_points)
of speed multipliers. If neither wind_directions nor wind_speeds are
defined, then this should be a single row array
* 'wind_directions': A 1D NumPy array (size num_wd) of wind directions (degrees).
Optional.
* 'wind_speeds': A 1D NumPy array (size num_ws) of wind speeds (m/s). Optional.
Defaults to None.
heterogeneous_inflow_config (dict, optional): A dictionary containing the following keys.
Defaults to None.
* 'speed_multipliers': A 2D NumPy array (size n_findex x num_points)
of speed multipliers.
* 'x': A 1D NumPy array (size num_points) of x-coordinates (meters).
* 'y': A 1D NumPy array (size num_points) of y-coordinates (meters).
"""
def __init__(
self,
wind_directions: float | NDArrayFloat,
wind_speeds: float | NDArrayFloat,
turbulence_intensities: float | NDArrayFloat,
values: NDArrayFloat | None = None,
heterogeneous_map: HeterogeneousMap | dict | None = None,
heterogeneous_inflow_config_by_wd: dict | None = None,
heterogeneous_inflow_config: dict | None = None,
):
# Check that wind_directions, wind_speeds, and turbulence_intensities are either numpy array
# of floats
if not isinstance(wind_directions, (float, np.ndarray)):
raise TypeError("wind_directions must be a float or a NumPy array")
if not isinstance(wind_speeds, (float, np.ndarray)):
raise TypeError("wind_speeds must be a float or a NumPy array")
if not isinstance(turbulence_intensities, (float, np.ndarray)):
raise TypeError("turbulence_intensities must be a float or a NumPy array")
# At least one of wind_directions, wind_speeds, or turbulence_intensities must be an array
if (
not isinstance(wind_directions, np.ndarray)
and not isinstance(wind_speeds, np.ndarray)
and not isinstance(turbulence_intensities, np.ndarray)
):
raise TypeError(
"At least one of wind_directions, wind_speeds, or "
" turbulence_intensities must be a NumPy array"
)
# For each of wind_directions, wind_speeds, and turbulence_intensities provided as
# an array, confirm they are the same length
if isinstance(wind_directions, np.ndarray) and isinstance(wind_speeds, np.ndarray):
if len(wind_directions) != len(wind_speeds):
raise ValueError(
"wind_directions and wind_speeds must be the same length if provided as arrays"
)
if isinstance(wind_directions, np.ndarray) and isinstance(
turbulence_intensities, np.ndarray
):
if len(wind_directions) != len(turbulence_intensities):
raise ValueError(
"wind_directions and turbulence_intensities must be "
"the same length if provided as arrays"
)
if isinstance(wind_speeds, np.ndarray) and isinstance(turbulence_intensities, np.ndarray):
if len(wind_speeds) != len(turbulence_intensities):
raise ValueError(
"wind_speeds and turbulence_intensities must be the "
"same length if provided as arrays"
)
# For each of wind_directions, wind_speeds, and turbulence_intensities
# provided as a single value, set them
# to be the same length as those passed in as arrays
if isinstance(wind_directions, float):
if isinstance(wind_speeds, np.ndarray):
wind_directions = np.full(len(wind_speeds), wind_directions)
elif isinstance(turbulence_intensities, np.ndarray):
wind_directions = np.full(len(turbulence_intensities), wind_directions)
if isinstance(wind_speeds, float):
if isinstance(wind_directions, np.ndarray):
wind_speeds = np.full(len(wind_directions), wind_speeds)
elif isinstance(turbulence_intensities, np.ndarray):
wind_speeds = np.full(len(turbulence_intensities), wind_speeds)
if isinstance(turbulence_intensities, float):
if isinstance(wind_directions, np.ndarray):
turbulence_intensities = np.full(len(wind_directions), turbulence_intensities)
elif isinstance(wind_speeds, np.ndarray):
turbulence_intensities = np.full(len(wind_speeds), turbulence_intensities)
# If values is not None, must be same length as wind_directions/wind_speeds/
if values is not None:
if len(wind_directions) != len(values):
raise ValueError("wind_directions and values must be the same length")
self.wind_directions = wind_directions
self.wind_speeds = wind_speeds
self.turbulence_intensities = turbulence_intensities
self.values = values
# Check that at most one of heterogeneous_inflow_config_by_wd,
# heterogeneous_map and heterogeneous_inflow_config is not None
if (
sum(
[
heterogeneous_inflow_config_by_wd is not None,
heterogeneous_map is not None,
heterogeneous_inflow_config is not None,
]
)
> 1
):
raise ValueError(
"Only one of heterogeneous_inflow_config_by_wd, "
+ "heterogeneous_map, and heterogeneous_inflow_config can be not None."
)
# if heterogeneous_inflow_config is not None, then the speed_multipliers
# must be the same length as wind_directions
# in the 0th dimension
if heterogeneous_inflow_config is not None:
if len(heterogeneous_inflow_config["speed_multipliers"]) != len(wind_directions):
raise ValueError("speed_multipliers must be the same length as wind_directions")
# Check heterogeneous_inflow_config and save
self.check_heterogeneous_inflow_config(heterogeneous_inflow_config)
self.heterogeneous_inflow_config = heterogeneous_inflow_config
else:
self.heterogeneous_inflow_config = None
# If heterogeneous_inflow_config_by_wd is not None, then create a HeterogeneousMap object
# using the dictionary
if heterogeneous_inflow_config_by_wd is not None:
# TODO: In future, add deprectation warning for this parameter here
self.heterogeneous_map = HeterogeneousMap(**heterogeneous_inflow_config_by_wd)
# Else if heterogeneous_map is not None
elif heterogeneous_map is not None:
# If heterogeneous_map is a dictionary, then create a HeterogeneousMap object
if isinstance(heterogeneous_map, dict):
self.heterogeneous_map = HeterogeneousMap(**heterogeneous_map)
# Else if heterogeneous_map is a HeterogeneousMap object, then save it
elif isinstance(heterogeneous_map, HeterogeneousMap):
self.heterogeneous_map = heterogeneous_map
# Else raise an error
else:
raise ValueError(
"heterogeneous_map must be a HeterogeneousMap object or a dictionary."
)
# Else if neither heterogeneous_map nor heterogeneous_inflow_config_by_wd are defined,
# then set heterogeneous_map to None
else:
self.heterogeneous_map = None
# Record findex
self.n_findex = len(self.wind_directions)
[docs]
def unpack(self):
"""
Unpack the time series data in a manner consistent with wind rose unpack
"""
# to match wind_rose, make a uniform frequency
uniform_frequency = np.ones_like(self.wind_directions)
uniform_frequency = uniform_frequency / uniform_frequency.sum()
# If heterogeneous_map is not None, then update
# heterogeneous_inflow_config to match wind_directions_unpack
if self.heterogeneous_map is not None:
heterogeneous_inflow_config = self.heterogeneous_map.get_heterogeneous_inflow_config(
wind_directions=self.wind_directions, wind_speeds=self.wind_speeds
)
else:
heterogeneous_inflow_config = self.heterogeneous_inflow_config
return (
self.wind_directions,
self.wind_speeds,
self.turbulence_intensities,
uniform_frequency,
self.values,
heterogeneous_inflow_config,
)
def _wrap_wind_directions_near_360(self, wind_directions, wd_step):
"""
Wraps the wind directions using `wd_step` to produce a wrapped version
where values between [360 - wd_step/2.0, 360] get mapped to negative numbers
for binning.
Args:
wind_directions (NDArrayFloat): NumPy array of wind directions.
wd_step (float): Step size for wind direction.
Returns:
NDArrayFloat: Wrapped version of wind directions.
"""
wind_directions_wrapped = wind_directions.copy()
mask = wind_directions_wrapped >= 360 - wd_step / 2.0
wind_directions_wrapped[mask] = wind_directions_wrapped[mask] - 360.0
return wind_directions_wrapped
[docs]
def assign_ti_using_wd_ws_function(self, func):
"""
Use the passed in function to new assign values to turbulence_intensities
Args:
func (function): Function which accepts wind_directions as its
first argument and wind_speeds as second argument and returns
turbulence_intensities
"""
self.turbulence_intensities = func(self.wind_directions, self.wind_speeds)
[docs]
def assign_ti_using_IEC_method(self, Iref=0.07, offset=3.8):
"""
Define TI as a function of wind speed by specifying an Iref and offset
value as in the normal turbulence model in the IEC 61400-1 standard
Args:
Iref (float): Reference turbulence level, defined as the expected
value of TI at 15 m/s. Default = 0.07. Note this value is
lower than the values of Iref for turbulence classes A, B, and
C in the IEC standard (0.16, 0.14, and 0.12, respectively), but
produces TI values more in line with those typically used in
FLORIS. When the default Iref and offset are used, the TI at
8 m/s is 8.6%.
offset (float): Offset value to equation. Default = 3.8, as defined
in the IEC standard to give the expected value of TI for
each wind speed.
"""
if (Iref < 0) or (Iref > 1):
raise ValueError("Iref must be >= 0 and <=1")
def iref_func(wind_directions, wind_speeds):
sigma_1 = Iref * (0.75 * wind_speeds + offset)
return sigma_1 / wind_speeds
self.assign_ti_using_wd_ws_function(iref_func)
[docs]
def assign_value_using_wd_ws_function(self, func, normalize=False):
"""
Use the passed in function to assign new values to the value table.
Args:
func (function): Function which accepts wind_directions as its
first argument and wind_speeds as second argument and returns
values.
normalize (bool, optional): If True, the value array will be
normalized by the mean value. Defaults to False.
"""
self.values = func(self.wind_directions, self.wind_speeds)
if normalize:
self.values /= np.mean(self.values)
[docs]
def assign_value_piecewise_linear(
self,
value_zero_ws=1.425,
ws_knee=4.5,
slope_1=0.0,
slope_2=-0.135,
limit_to_zero=False,
normalize=False,
):
"""
Define value as a continuous piecewise linear function of wind speed
with two line segments. The default parameters yield a value function
that approximates the normalized mean electricity price vs. wind speed
curve for the SPP market in the U.S. for years 2018-2020 from figure 7
in Simley et al. "The value of wake steering wind farm flow control in
US energy markets," Wind Energy Science, 2024.
https://doi.org/10.5194/wes-9-219-2024. This default value function is
constant at low wind speeds, then linearly decreases above 4.5 m/s.
Args:
value_zero_ws (float, optional): The value when wind speed is zero.
Defaults to 1.425.
ws_knee (float, optional): The wind speed separating line segments
1 and 2. Default = 4.5 m/s.
slope_1 (float, optional): The slope of the first line segment
(unit of value per m/s). Defaults to zero.
slope_2 (float, optional): The slope of the second line segment
(unit of value per m/s). Defaults to -0.135.
limit_to_zero (bool, optional): If True, negative values will be
set to zero. Defaults to False.
normalize (bool, optional): If True, the value array will be
normalized by the mean value. Defaults to False.
"""
def piecewise_linear_value_func(wind_directions, wind_speeds):
value = np.zeros_like(wind_speeds, dtype=float)
value[wind_speeds < ws_knee] = (
slope_1 * wind_speeds[wind_speeds < ws_knee] + value_zero_ws
)
offset_2 = (slope_1 - slope_2) * ws_knee + value_zero_ws
value[wind_speeds >= ws_knee] = slope_2 * wind_speeds[wind_speeds >= ws_knee] + offset_2
if limit_to_zero:
value[value < 0] = 0.0
return value
self.assign_value_using_wd_ws_function(piecewise_linear_value_func, normalize)
[docs]
def to_WindRose(self, wd_step=2.0, ws_step=1.0, wd_edges=None, ws_edges=None, bin_weights=None):
"""
Converts the TimeSeries data to a WindRose.
Args:
wd_step (float, optional): Step size for wind direction (default is 2.0).
ws_step (float, optional): Step size for wind speed (default is 1.0).
wd_edges (NDArrayFloat, optional): Custom wind direction edges. Defaults to None.
ws_edges (NDArrayFloat, optional): Custom wind speed edges. Defaults to None.
bin_weights (NDArrayFloat, optional): Bin weights for resampling. Note these
are primarily used by the downsample() method.
Defaults to None.
Returns:
WindRose: A WindRose object based on the TimeSeries data.
Notes:
- If `wd_edges` is defined, it uses it to produce the bin centers.
- If `wd_edges` is not defined, it determines `wd_edges` from the step and data.
- If `ws_edges` is defined, it uses it for wind speed edges.
- If `ws_edges` is not defined, it determines `ws_edges` from the step and data.
"""
# If wd_edges is defined, then use it to produce the bin centers
if wd_edges is not None:
wd_step = wd_edges[1] - wd_edges[0]
# use wd_step to produce a wrapped version of wind_directions
wind_directions_wrapped = self._wrap_wind_directions_near_360(
self.wind_directions, wd_step
)
# Else, determine wd_edges from the step and data
else:
wd_edges = np.arange(0.0 - wd_step / 2.0, 360.0, wd_step)
# use wd_step to produce a wrapped version of wind_directions
wind_directions_wrapped = self._wrap_wind_directions_near_360(
self.wind_directions, wd_step
)
# Only keep the range with values in it
wd_edges = wd_edges[wd_edges + wd_step > wind_directions_wrapped.min()]
wd_edges = wd_edges[wd_edges - wd_step <= wind_directions_wrapped.max()]
# Define the centers from the edges
wd_centers = wd_edges[:-1] + wd_step / 2.0
# Repeat for wind speeds
if ws_edges is not None:
ws_step = ws_edges[1] - ws_edges[0]
else:
ws_edges = np.arange(0.0 - ws_step / 2.0, 50.0, ws_step)
# Only keep the range with values in it
ws_edges = ws_edges[ws_edges + ws_step > self.wind_speeds.min()]
ws_edges = ws_edges[ws_edges - ws_step <= self.wind_speeds.max()]
# Define the centers from the edges
ws_centers = ws_edges[:-1] + ws_step / 2.0
# Now use pandas to get the tables need for wind rose
df = pd.DataFrame(
{
"wd": wind_directions_wrapped,
"ws": self.wind_speeds,
"freq_val": np.ones(len(wind_directions_wrapped)),
}
)
# If bin_weights are passed in, apply these to the frequency
# this is mostly used when resampling the wind rose
if bin_weights is not None:
df = df.assign(freq_val=df["freq_val"] * bin_weights)
# Add turbulence intensities to dataframe
df = df.assign(turbulence_intensities=self.turbulence_intensities)
# If values is not none, add to dataframe
if self.values is not None:
df = df.assign(values=self.values)
# Bin wind speed and wind direction and then group things up
df = (
df.assign(
wd_bin=pd.cut(
df.wd, bins=wd_edges, labels=wd_centers, right=False, include_lowest=True
)
)
.assign(
ws_bin=pd.cut(
df.ws, bins=ws_edges, labels=ws_centers, right=False, include_lowest=True
)
)
.drop(["wd", "ws"], axis=1)
)
# Convert wd_bin and ws_bin to categoricals to ensure all combinations
# are considered and then group
wd_cat = CategoricalDtype(categories=wd_centers, ordered=True)
ws_cat = CategoricalDtype(categories=ws_centers, ordered=True)
df = (
df.assign(wd_bin=df["wd_bin"].astype(wd_cat))
.assign(ws_bin=df["ws_bin"].astype(ws_cat))
.groupby(["wd_bin", "ws_bin"], observed=False)
.agg(["sum", "mean"])
)
# Flatten and combine levels using an underscore
df.columns = ["_".join(col) for col in df.columns]
# Collect the frequency table and reshape
freq_table = df["freq_val_sum"].values.copy()
freq_table = freq_table / freq_table.sum()
freq_table = freq_table.reshape((len(wd_centers), len(ws_centers)))
# Compute the TI table
ti_table = df["turbulence_intensities_mean"].values.copy()
ti_table = ti_table.reshape((len(wd_centers), len(ws_centers)))
# If values is not none, compute the table
if self.values is not None:
value_table = df["values_mean"].values.copy()
value_table = value_table.reshape((len(wd_centers), len(ws_centers)))
else:
value_table = None
# Return a WindRose
return WindRose(
wd_centers,
ws_centers,
ti_table,
freq_table,
value_table,
self.heterogeneous_map,
)
[docs]
def to_WindTIRose(
self,
wd_step=2.0,
ws_step=1.0,
ti_step=0.02,
wd_edges=None,
ws_edges=None,
ti_edges=None,
bin_weights=None,
):
"""
Converts the TimeSeries data to a WindTIRose.
Args:
wd_step (float, optional): Step size for wind direction (default is 2.0).
ws_step (float, optional): Step size for wind speed (default is 1.0).
ti_step (float, optional): Step size for turbulence intensity (default is 0.02).
wd_edges (NDArrayFloat, optional): Custom wind direction edges. Defaults to None.
ws_edges (NDArrayFloat, optional): Custom wind speed edges. Defaults to None.
ti_edges (NDArrayFloat, optional): Custom turbulence intensity
edges. Defaults to None.
bin_weights (NDArrayFloat, optional): Bin weights for resampling. Note these
are primarily used by the downsample() method.
Defaults to None.
Returns:
WindRose: A WindTIRose object based on the TimeSeries data.
Notes:
- If `wd_edges` is defined, it uses it to produce the wind direction bin edges.
- If `wd_edges` is not defined, it determines `wd_edges` from the step and data.
- If `ws_edges` is defined, it uses it for wind speed edges.
- If `ws_edges` is not defined, it determines `ws_edges` from the step and data.
- If `ti_edges` is defined, it uses it for turbulence intensity edges.
- If `ti_edges` is not defined, it determines `ti_edges` from the step and data.
"""
# If wd_edges is defined, then use it to produce the bin centers
if wd_edges is not None:
wd_step = wd_edges[1] - wd_edges[0]
# use wd_step to produce a wrapped version of wind_directions
wind_directions_wrapped = self._wrap_wind_directions_near_360(
self.wind_directions, wd_step
)
# Else, determine wd_edges from the step and data
else:
wd_edges = np.arange(0.0 - wd_step / 2.0, 360.0, wd_step)
# use wd_step to produce a wrapped version of wind_directions
wind_directions_wrapped = self._wrap_wind_directions_near_360(
self.wind_directions, wd_step
)
# Only keep the range with values in it
wd_edges = wd_edges[wd_edges + wd_step > wind_directions_wrapped.min()]
wd_edges = wd_edges[wd_edges - wd_step <= wind_directions_wrapped.max()]
# Define the centers from the edges
wd_centers = wd_edges[:-1] + wd_step / 2.0
# Repeat for wind speeds
if ws_edges is not None:
ws_step = ws_edges[1] - ws_edges[0]
else:
ws_edges = np.arange(0.0 - ws_step / 2.0, 50.0, ws_step)
# Only keep the range with values in it
ws_edges = ws_edges[ws_edges + ws_step > self.wind_speeds.min()]
ws_edges = ws_edges[ws_edges - ws_step <= self.wind_speeds.max()]
# Define the centers from the edges
ws_centers = ws_edges[:-1] + ws_step / 2.0
# Repeat for turbulence intensities
if ti_edges is not None:
ti_step = ti_edges[1] - ti_edges[0]
else:
ti_edges = np.arange(0.0 - ti_step / 2.0, 1.0, ti_step)
# Only keep the range with values in it
ti_edges = ti_edges[ti_edges + ti_step > self.turbulence_intensities.min()]
ti_edges = ti_edges[ti_edges - ti_step <= self.turbulence_intensities.max()]
# Define the centers from the edges
ti_centers = ti_edges[:-1] + ti_step / 2.0
# Now use pandas to get the tables need for wind rose
df = pd.DataFrame(
{
"wd": wind_directions_wrapped,
"ws": self.wind_speeds,
"ti": self.turbulence_intensities,
"freq_val": np.ones(len(wind_directions_wrapped)),
}
)
# If bin_weights are passed in, apply these to the frequency
# this is mostly used when resampling the wind rose
if bin_weights is not None:
df = df.assign(freq_val=df["freq_val"] * bin_weights)
# If values is not none, add to dataframe
if self.values is not None:
df = df.assign(values=self.values)
# Bin wind speed, wind direction, and turbulence intensity and then group things up
df = (
df.assign(
wd_bin=pd.cut(
df.wd, bins=wd_edges, labels=wd_centers, right=False, include_lowest=True
)
)
.assign(
ws_bin=pd.cut(
df.ws, bins=ws_edges, labels=ws_centers, right=False, include_lowest=True
)
)
.assign(
ti_bin=pd.cut(
df.ti, bins=ti_edges, labels=ti_centers, right=False, include_lowest=True
)
)
.drop(["wd", "ws", "ti"], axis=1)
)
# Convert wd_bin, ws_bin, and ti_bin to categoricals to ensure all
# combinations are considered and then group
wd_cat = CategoricalDtype(categories=wd_centers, ordered=True)
ws_cat = CategoricalDtype(categories=ws_centers, ordered=True)
ti_cat = CategoricalDtype(categories=ti_centers, ordered=True)
df = (
df.assign(wd_bin=df["wd_bin"].astype(wd_cat))
.assign(ws_bin=df["ws_bin"].astype(ws_cat))
.assign(ti_bin=df["ti_bin"].astype(ti_cat))
.groupby(["wd_bin", "ws_bin", "ti_bin"], observed=False)
.agg(["sum", "mean"])
)
# Flatten and combine levels using an underscore
df.columns = ["_".join(col) for col in df.columns]
# Collect the frequency table and reshape
freq_table = df["freq_val_sum"].values.copy()
freq_table = freq_table / freq_table.sum()
freq_table = freq_table.reshape((len(wd_centers), len(ws_centers), len(ti_centers)))
# If values is not none, compute the table
if self.values is not None:
value_table = df["values_mean"].values.copy()
value_table = value_table.reshape((len(wd_centers), len(ws_centers), len(ti_centers)))
else:
value_table = None
# Return a WindTIRose
return WindTIRose(
wd_centers,
ws_centers,
ti_centers,
freq_table,
value_table,
self.heterogeneous_map,
)
[docs]
class WindRoseWRG(WindDataBase):
"""
The WindRoseWRG class is a WindData object the represents a wind resource grid (WRG) file
to FLORIS. As a WindData object it can be passed to the FlorisModel.set method. A WRG file
represents a wind resource as a grid of points where each point has a separate wind rose define
by the frequency of each wind direction and the Weibull parameters for each wind direction.
WindRoseWRG objects are provided the layout of a wind farm and computes a wind rose at
each point in the layout. The wind rose at each point is computed by interpolating the weibull
parameter in the WRG file to the point in the layout and using them to compute a WindRose
object. Each WindRose object shares wind direction and wind speed, only the frequencies differ.
When running a FlorisModel with a WindRoseWRG object, most behaviors are the same
except functions which compute an expected value, use separate frequencies for each
turbine to weight the individual power bins.
Args:
filename (str): The name of the WRG file to read.
wd_step (float, optional): Step size to use resampling the wind directions given by the WRG
file. If None, wd_step and wind_directions are set by the number of
sectors in the WRG file. Defaults to None.
wind_speeds (NDArrayFloat, optional): Wind speeds to use in the wind rose. Defaults to
np.arange(0.0, 26.0, 1.0).
ti_table (float, optional): Turbulence intensities table to use for each WindRose object.
As in the WindRose ti_table, this can be a single value or an array of values. If an
array of values is provided, it must be (len(wind_directions) x len(wind_speeds)).
Defaults to 0.06.
"""
def __init__(
self, filename, wd_step=None, wind_speeds=np.arange(0.0, 26.0, 1.0), ti_table=0.06
):
# Read in the WRG file
self.filename = filename
self.read_wrg_file(filename)
# If wd_step is None, then use the wind directions in the WRG file
if wd_step is None:
self.wind_directions = self._wind_directions_wrg_file
self.wd_step = self.wind_directions[1] - self.wind_directions[0]
else:
self.wind_directions = np.arange(0.0, 360.0, wd_step)
self.wd_step = wd_step
# Initialize the layouts which will need to be specified
self.layout_x = None
self.layout_y = None
# Save the wind speeds and ti_table
self.wind_speeds = wind_speeds
self.ti_table = ti_table
# Initialize the flat arrays, these will depend on the specified wind speeds
self.wd_flat = None
self.ws_flat = None
self.non_zero_freq_mask = None
[docs]
def read_wrg_file(self, filename):
"""
Read the contents of a WRG file and store the data in the object.
Args:
filename (str): The name of the WRG file to read.
"""
# Read the file into data
with open(filename, "r") as f:
data = f.readlines()
# Read the header
header = data[0].split()
self.nx = int(header[0])
self.ny = int(header[1])
self.xmin = float(header[2])
self.ymin = float(header[3])
self.grid_size = float(header[4])
# The grid of points is implied by the values above
self.x_array = np.arange(self.nx) * self.grid_size + self.xmin
self.y_array = np.arange(self.ny) * self.grid_size + self.ymin
# The number of grid points (n_gid) is the product of the number of points in x and y
self.n_gid = self.nx * self.ny
# Finally get the number of sectors from the first line after the header
self.n_sectors = int(data[1][70:72])
# The wind directions are implied by the number of sectors
self._wind_directions_wrg_file = np.arange(0.0, 360.0, 360.0 / self.n_sectors)
# Initialize the data arrays which have the same number of
# elements as the number of grid points
x_gid = np.zeros(self.n_gid)
y_gid = np.zeros(self.n_gid)
z_gid = np.zeros(self.n_gid)
h_gid = np.zeros(self.n_gid)
# Initialize the data arrays which are n_gid x n_sectors
sector_freq_gid = np.zeros((self.n_gid, self.n_sectors))
weibull_A_gid = np.zeros((self.n_gid, self.n_sectors))
weibull_k_gid = np.zeros((self.n_gid, self.n_sectors))
# Loop through the data and extract the values
for gid in range(self.n_gid):
line = data[1 + gid]
x_gid[gid] = float(line[10:20])
y_gid[gid] = float(line[20:30])
z_gid[gid] = float(line[30:38])
h_gid[gid] = float(line[38:43])
for sector in range(self.n_sectors):
# The frequency of the wind in this sector is in probablility * 1000
sector_freq_gid[gid, sector] = (
float(line[72 + sector * 13 : 76 + sector * 13]) / 1000.0
)
# The A and k parameters are in the next 10 characters, with A stored * 10
# and k stored * 100
weibull_A_gid[gid, sector] = float(line[76 + sector * 13 : 80 + sector * 13]) / 10.0
weibull_k_gid[gid, sector] = (
float(line[80 + sector * 13 : 85 + sector * 13]) / 100.0
)
# Save the x_gid and y_gid form for iteration in het map
self.x_gid = x_gid
self.y_gid = y_gid
self.weibull_A_gid = weibull_A_gid
self.weibull_k_gid = weibull_k_gid
# Save a single value of z and h for the entire grid
self.z = z_gid[0]
self.h = h_gid[0]
# Index the by sector data by x and y
self.sector_freq = np.zeros((self.nx, self.ny, self.n_sectors))
self.weibull_A = np.zeros((self.nx, self.ny, self.n_sectors))
self.weibull_k = np.zeros((self.nx, self.ny, self.n_sectors))
for x_idx, x in enumerate(self.x_array):
for y_idx, y in enumerate(self.y_array):
# Find the indices when x_gid and y_gid are equal to x and y
idx = np.where((x_gid == x) & (y_gid == y))[0]
# Assign the data to the correct location
self.sector_freq[x_idx, y_idx, :] = sector_freq_gid[idx, :]
self.weibull_A[x_idx, y_idx, :] = weibull_A_gid[idx, :]
self.weibull_k[x_idx, y_idx, :] = weibull_k_gid[idx, :]
# Build the interpolant function lists
self.interpolant_sector_freq = self._build_interpolant_function_list(
self.x_array, self.y_array, self.n_sectors, self.sector_freq
)
self.interpolant_weibull_A = self._build_interpolant_function_list(
self.x_array, self.y_array, self.n_sectors, self.weibull_A
)
self.interpolant_weibull_k = self._build_interpolant_function_list(
self.x_array, self.y_array, self.n_sectors, self.weibull_k
)
def __str__(self) -> str:
"""
Return a string representation of the WindRose object
"""
return (
f"WindResourceGrid with {self.nx} x {self.ny} grid points, "
f"min x: {self.xmin}, min y: {self.ymin}, grid size: {self.grid_size}, "
f"z: {self.z}, h: {self.h}, {self.n_sectors} sectors\n"
f"Wind directions in file: {self._wind_directions_wrg_file}\n"
f"Wind directions: {self.wind_directions}\n"
f"Wind speeds: {self.wind_speeds}\n"
f"ti_table: {self.ti_table}"
)
def _build_interpolant_function_list(self, x, y, n_sectors, data):
"""
Build a list of interpolant functions for the data. It is assumed that the function
should return a list of interpolant functions, length n_sectors.
Args:
x (np.array): The x values of the data, length nx.
y (np.array): The y values of the data, length ny.
n_sectors (int): The number of sectors.
data (np.array): The data to interpolate, shape (nx, ny, n_sectors).
Returns:
list: A list of interpolant functions, length n_sectors.
"""
function_list = []
for sector in range(n_sectors):
function_list.append(
RegularGridInterpolator(
(x, y),
data[:, :, sector],
bounds_error=False,
fill_value=None,
)
)
return function_list
def _interpolate_data(self, x, y, interpolant_function_list):
"""
Interpolate the data at a given x, y location using the interpolant function list.
Args:
x (float): The x location to interpolate.
y (float): The y location to interpolate.
interpolant_function_list (list): A list of interpolant functions.
Returns:
list: A list of interpolated data, length n_sectors.
"""
# Check if x and y are within the bounds of the self.x_array and self.y_array, if
# so use the nearest method, otherwise use the linear method of interpolation
if (
x < self.x_array[0]
or x > self.x_array[-1]
or y < self.y_array[0]
or y > self.y_array[-1]
):
method = "nearest"
else:
method = "linear"
result = np.zeros(self.n_sectors)
for sector in range(self.n_sectors):
result[sector] = interpolant_function_list[sector]((x, y), method=method)
return result
def _weibull_cumulative(self, x, a, k):
"""
Calculate the Weibull cumulative distribution function.
Args:
x (np.array): The wind speed values.
a (np.array): The Weibull A parameter values.
k (np.array): The Weibull k parameter values.
Returns:
np.array: The cumulative distribution function values.
"""
exponent = -((x / a) ** k)
result = 1.0 - np.exp(exponent)
# Where x is less than 0, the result should be 0
result[x < 0] = 0.0
return result
# Original code from PJ Stanley
# if x >= 0.0:
# exponent = -(x / a) ** k
# return 1.0 - np.exp(exponent)
# else:
# return 0.0
def _generate_wind_speed_frequencies_from_weibull(self, A, k, wind_speeds=None):
"""
Generate the wind speed frequencies from the Weibull parameters. Use the
cumulative form of the function and calculate the probability of the wind speed
in a given bin via the difference in the cumulative function at the bin edges.
Args:
A (np.array): The Weibull A parameter.
k (np.array): The Weibull k parameter.
wind_speeds (np.array): The wind speeds to calculate the frequencies for.
If None, the frequencies are calculated for 0 to 25 m/s in 1 m/s increments.
Default is None.
Returns:
np.array: The wind speed frequencies.
"""
if wind_speeds is None:
wind_speeds = self.wind_speeds
ws_steps = np.diff(wind_speeds)
if not np.all(np.isclose(ws_steps, ws_steps[0])):
raise ValueError("wind_speeds must be equally spaced.")
else:
ws_step = ws_steps[0]
# Define the wind speed edges (not half-open interval in np.arange)
wind_speed_edges = np.arange(
wind_speeds[0] - ws_step / 2, wind_speeds[-1] + ws_step, ws_step
)
# Get the cumulative distribution function at the edges
cdf_edges = self._weibull_cumulative(wind_speed_edges, A, k)
# The frequency is the difference in the cumulative distribution function
# at the edges
# NOTE: The probability mass associated to each discrete wind speed (ws) is taken as the
# cumulative mass under the continuous Weibull distribution from ws - ws_step/2 to
# ws + ws_step/2, where ws_step is the step between the provided wind_speeds.
freq = cdf_edges[1:] - cdf_edges[:-1]
# Normalize the frequency
freq = freq / freq.sum()
return wind_speeds, freq
[docs]
def get_wind_rose_at_point(self, x, y, wind_directions=None, wind_speeds=None, ti_table=0.06):
"""
Get the wind rose at a given x, y location. Interpolate the parameters to the point
and then generate the wind rose.
Args:
x (float): The x location to interpolate.
y (float): The y location to interpolate.
wind_directions (np.array): The wind directions to calculate the frequencies for.
If None, use self.wind_directions. Default is None.
wind_speeds (np.array): The wind speeds to calculate the frequencies for.
If None, use self.wind_speeds. Default is None.
ti_table (float): The ti_table to use in the wind rose.
Default is 0.06.
"""
if wind_speeds is None:
wind_speeds = self.wind_speeds
# If wind directions is None, use the values stored
if wind_directions is None:
wind_directions = self.wind_directions
wd_step = self.wd_step
else:
# Calculate wd_step for these directions
wd_step = wind_directions[1] - wind_directions[0]
# Get the interpolated data
sector_freq = self._interpolate_data(x, y, self.interpolant_sector_freq)
weibull_A = self._interpolate_data(x, y, self.interpolant_weibull_A)
weibull_k = self._interpolate_data(x, y, self.interpolant_weibull_k)
# Initialize the freq_table
freq_table = np.zeros((self.n_sectors, len(wind_speeds)))
# First fill in the rows of the table using the weibull distributions,
# weighted by the sector freq
for sector in range(self.n_sectors):
wind_speeds, freq = self._generate_wind_speed_frequencies_from_weibull(
weibull_A[sector], weibull_k[sector], wind_speeds=wind_speeds
)
freq_table[sector, :] = sector_freq[sector] * freq
# Normalize the table
freq_table = freq_table / freq_table.sum()
# First build the wind rose using the wind directions in the wrg file
wind_rose = WindRose(
wind_directions=self._wind_directions_wrg_file,
wind_speeds=wind_speeds,
freq_table=freq_table,
ti_table=ti_table,
compute_zero_freq_occurrence=True,
)
# Now upsample or downsample the wind rose to the specified wind directions
if wd_step == (self._wind_directions_wrg_file[1] - self._wind_directions_wrg_file[0]):
# If the wind directions are the same, return the wind rose
return wind_rose
elif wd_step < (self._wind_directions_wrg_file[1] - self._wind_directions_wrg_file[0]):
# If the wind directions are smaller, upsample
return wind_rose.upsample(wd_step)
else:
# If the wind directions are larger, downsample
return wind_rose.downsample(wd_step)
[docs]
def set_wd_step(self, wd_step):
"""
Set the wind directions for the WindRoseWRG object.
Args:
wind_directions (np.array): The wind directions to use for the wind roses.
"""
self.wind_directions = np.arange(0.0, 360.0, wd_step)
self.wd_step = wd_step
# Update the wind roses if the layout has been set
if self.layout_x is not None:
self._update_wind_roses()
[docs]
def set_wind_speeds(self, wind_speeds):
"""
Set the wind speeds for the WindRoseWRG object.
Args:
wind_speeds (np.array): The wind speeds to use for the wind roses.
"""
self.wind_speeds = wind_speeds
# Update the wind roses if the layout has been set
if self.layout_x is not None:
self._update_wind_roses()
[docs]
def set_ti_table(self, ti_table):
"""
Set the fixed turbulence intensity value for the WindRoseWRG object.
Args:
ti_table (float): The ti_table value to use in the wind roses.
"""
self.ti_table = ti_table
# Update the wind roses if the layout has been set
if self.layout_x is not None:
self._update_wind_roses()
[docs]
def set_layout(self, layout_x, layout_y):
"""
Set the layout for the WindRoseWRG object.
Args:
layout_x (np.array): The x coordinates of the layout.
layout_y (np.array): The y coordinates of the layout.
"""
# Confirm that layout_x, layout_y, and wind_roses are the same length
if len(layout_x) != len(layout_y):
raise ValueError("layout_x and layout_y must be the same length")
# If the current layout is the same as the new layout, return
if self.layout_x is not None and self.layout_y is not None:
if np.allclose(np.array(layout_x), self.layout_x) and np.allclose(
np.array(layout_y), self.layout_y
):
return
# Save the layouts
self.layout_x = np.array(layout_x)
self.layout_y = np.array(layout_y)
# Update the wind roses
self._update_wind_roses()
def _update_wind_roses(self):
# Initialize the list of wind roses
self.wind_roses = []
# Loop through the turbines and get the wind rose at each location
for i in range(len(self.layout_x)):
wind_rose = self.get_wind_rose_at_point(
self.layout_x[i],
self.layout_y[i],
wind_directions=self.wind_directions,
wind_speeds=self.wind_speeds,
ti_table=self.ti_table,
)
self.wind_roses.append(wind_rose)
# Save also the wd_flat and ws_flat from the first wind rose as this could be needed
# for unpacking and non_zero_freq_mask
self.wd_flat = self.wind_roses[0].wd_flat
self.ws_flat = self.wind_roses[0].ws_flat
self.non_zero_freq_mask = self.wind_roses[0].non_zero_freq_mask
[docs]
def unpack(self):
"""
Implement the unpack method for WindRoseByTurbine by
calling the unpack method for each of the WindRose objects in wind_roses.
Mose of the variables can be passed as is but freq_table_unpack are combined
and stacked along the 1th axis
Returns:
Tuple: Tuple containing the unpacked wind rose data.
"""
if self.layout_x is None:
raise ValueError("WindRoseByTurbine must be initialized to a layout before unpacking")
# Initialize freq_table_unpack
freq_table_unpack = np.zeros((len(self.wd_flat), len(self.layout_x)))
# Loop over remaining wind roses and stack freq_table_unpack
for i, wind_rose in enumerate(self.wind_roses):
(
wind_directions_unpack,
wind_speeds_unpack,
ti_table_unpack,
freq_table_unpack_0,
value_table_unpack,
heterogeneous_inflow_config,
) = wind_rose.unpack()
freq_table_unpack[:, i] = freq_table_unpack_0
return (
wind_directions_unpack,
wind_speeds_unpack,
ti_table_unpack,
freq_table_unpack,
value_table_unpack,
heterogeneous_inflow_config,
)
[docs]
def plot_wind_roses(
self,
axarr=None,
wd_step=None,
ws_step=None,
):
"""
Plot the wind roses for each turbine in the WindRoseByTurbine object.
Args:
axarr (NDArrayAxes, optional): Array of axes to plot the wind roses on.
Defaults to None. Must have length equal to the number of wind roses.
wd_step (float, optional): Step size for wind direction. Defaults to None.
ws_step (float, optional): Step size for wind speed. Defaults to None.
"""
if self.layout_x is None:
raise ValueError("WindRoseByTurbine must be initialized to a layout before plotting")
# If axarr is not defined, create a new figure
if axarr is None:
_, axarr = plt.subplots(1, len(self.wind_roses), subplot_kw={"polar": True})
# Test that axarr is the correct length
if len(axarr) != len(self.wind_roses):
raise ValueError("axarr must have the same length as the number of wind roses")
# Plot the wind roses for each turbine
for i, wind_rose in enumerate(self.wind_roses):
wind_rose.plot(ax=axarr[i], wd_step=wd_step, ws_step=ws_step)
axarr[i].set_title(f"Turbine {i}\n ({self.layout_x[i]:.1f}, {self.layout_y[i]:.1f})")
[docs]
def get_heterogeneous_wind_rose(
self,
fmodel,
wind_speeds=None,
x_loc=None,
y_loc=None,
representative_wind_speed=8.0,
):
"""
Get the heterogeneous map at each location in the grid, with the speeds ups
defined relative the location indicated by gid_norm_index.
Args:
fmodel (FlorisModel): The FlorisModel object to use to generate the power curve.
wind_speeds (np.array): The wind speeds to calculate the frequencies for.
Default is np.arange(0.0, 25.0, 1.0).
gid_norm_index (int): The index of the turbine to normalize the speed ups to.
Default is 0.
representative_wind_speed (float): The representative wind speed to use
in the power curve.
Returns:
HeterogeneousMap: The heterogeneous map object.
"""
############################
# Compute the power curve for combining the wind speeds
############################
if wind_speeds is None:
wind_speeds = self.wind_speeds
# Get a local copy
fm = copy.deepcopy(fmodel)
# Get the power curve for the turbine
# TODO: Maybe the power curve could be directly extracted
fm.set(
layout_x=[0],
layout_y=[0],
wind_data=TimeSeries(
wind_speeds=wind_speeds,
wind_directions=270.0,
turbulence_intensities=0.06,
),
)
fm.run()
turbine_power = fm.get_turbine_powers().flatten()
############################
# Identify the point on the original wrg grid closest to the x_loc and y_loc
############################
if x_loc is None or y_loc is None:
# Simply use the first point
gid_reference = 0
else:
# Find the closest point
gid_reference = np.argmin((self.x_gid - x_loc) ** 2 + (self.y_gid - y_loc) ** 2)
# Assign x_loc and y_loc to this point
x_loc = self.x_gid[gid_reference]
y_loc = self.y_gid[gid_reference]
print(f"Using point {gid_reference} at ({x_loc}, {y_loc}) as reference location")
############################
# Get the wind rose at this point
############################
wind_rose = self.get_wind_rose_at_point(
x=x_loc,
y=y_loc,
)
# Subset to the representative wind speed
# Check the represenative_wind_speed is valid
if representative_wind_speed in wind_rose.wind_speeds:
ws_idx = np.where(wind_rose.wind_speeds == representative_wind_speed)[0]
else:
raise ValueError("representative_wind_speed must be in original set")
# Create a new wind rose with only the specified wind speeds
wind_rose = WindRose(
wind_rose.wind_directions,
wind_rose.wind_speeds[ws_idx],
wind_rose.ti_table[:, ws_idx],
wind_rose.freq_table[:, ws_idx],
wind_rose.value_table[:, ws_idx] if wind_rose.value_table is not None else None,
wind_rose.compute_zero_freq_occurrence,
wind_rose.heterogeneous_map,
)
############################
# Calculate speed multipliers
############################
speed_multipliers = np.zeros((self.n_sectors, self.n_gid))
for direction_sector in range(self.n_sectors):
for gid in range(self.n_gid):
_, freq = self._generate_wind_speed_frequencies_from_weibull(
self.weibull_A_gid[gid, direction_sector],
self.weibull_k_gid[gid, direction_sector],
wind_speeds=wind_speeds,
)
# Record the expected power
speed_multipliers[direction_sector, gid] = np.sum(turbine_power * freq)
# Normalize the speed ups
speed_multipliers[direction_sector, :] = (
speed_multipliers[direction_sector, :]
/ speed_multipliers[direction_sector, gid_reference]
)
# Take the cube root of the speed ups to place in the frame of wind speed ups
speed_multipliers = np.cbrt(speed_multipliers)
# Create the heterogeneous map
heterogeneous_map = HeterogeneousMap(
x=self.x_gid,
y=self.y_gid,
wind_directions=self._wind_directions_wrg_file,
speed_multipliers=speed_multipliers,
)
# Return the wind rose with the heterogeneous map
return WindRose(
wind_directions=wind_rose.wind_directions,
wind_speeds=wind_rose.wind_speeds,
freq_table=wind_rose.freq_table,
ti_table=wind_rose.ti_table,
heterogeneous_map=heterogeneous_map,
)