"""Utility functions for calculating energy ratios."""
from __future__ import annotations
import warnings
from typing import List, Optional, Union
import numpy as np
import polars as pl
# TODO: Someday I think can replace with polars-native code: https://github.com/pola-rs/polars/issues/8551
[docs]
def cut(
col_name: str,
edges: Union[np.ndarray, list],
) -> pl.Expr:
"""Bins the values in the specified column according to the given edges.
Args:
col_name (str): The name of the column to bin.
edges (array-like): The edges of the bins. Values will be placed into the bin
whose left edge is the largest edge less than or equal to
the value, and whose right edge is the smallest edge
greater than the value.
Returns:
expression: An expression object that can be used to bin the column.
"""
c = pl.col(col_name)
labels = edges[:-1] + np.diff(edges) / 2.0
expr = pl.when(c < edges[0]).then(None)
for edge, label in zip(edges[1:], labels):
expr = expr.when(c < edge).then(label)
expr = expr.otherwise(None)
return expr
[docs]
def bin_column(
df_: pl.DataFrame,
col_name: str,
bin_col_name: str,
edges: Union[np.ndarray, list],
) -> pl.DataFrame:
"""Bins the values in the specified column of a Polars DataFrame according to the given edges.
Args:
df_ (pl.DataFrame): The Polars DataFrame containing the column to bin.
col_name (str): The name of the column to bin.
bin_col_name (str): The name to give the new column containing the bin labels.
edges (array-like): The edges of the bins. Values will be placed into the bin
whose left edge is the largest edge less than or equal to
the value, and whose right edge is the smallest edge
greater than the value.
Returns:
pl.DataFrame: A new Polars DataFrame with an additional column containing the bin labels.
"""
return df_.with_columns(
cut(col_name=col_name, edges=edges).alias(bin_col_name).cast(df_[col_name].dtype)
)
[docs]
def add_ws(df_: pl.DataFrame, ws_cols: List[str], remove_all_nulls: bool = False) -> pl.DataFrame:
"""Add the ws column to a dataframe, given which columns to average over.
Args:
df_ (pl.DataFrame): The Polars DataFrame containing the column to bin.
ws_cols (list(str)): The name of the columns to average across.
remove_all_nulls: (bool): Remove all null values in ws_cols (rather than any)
Returns:
pl.DataFrame: A new Polars DataFrame with an additional ws column
"""
df_with_mean_ws = (
# df_.select(pl.exclude('ws_bin')) # In case ws_bin already exists
df_.with_columns(
# df_.select(ws_cols).mean(axis=1).alias('ws_bin')
ws=pl.concat_list(ws_cols).list.mean() # Initially ws_bin is just the mean
)
.filter(
pl.all_horizontal(pl.col(ws_cols).is_not_null())
if remove_all_nulls
else pl.any_horizontal(pl.col(ws_cols).is_not_null())
)
.filter((pl.col("ws").is_not_null()))
)
return df_with_mean_ws
[docs]
def add_ws_bin(
df_: pl.DataFrame,
ws_cols: List[str],
ws_step: float = 1.0,
ws_min: float = -0.5,
ws_max: float = 50.0,
edges: Optional[Union[np.ndarray, list]] = None,
remove_all_nulls: bool = False,
) -> pl.DataFrame:
"""Add the ws_bin column to a dataframe.
Given which columns to average over and the step sizes to use
Args:
df_ (pl.DataFrame): The Polars DataFrame containing the column to bin.
ws_cols (list(str)): The name of the columns to average across.
ws_step (float): Step size for binning
ws_min (float): Minimum wind speed
ws_max (float): Maximum wind speed
edges (array-like): The edges of the bins. Values will be placed into the bin
whose left edge is the largest edge less than or equal to
the value, and whose right edge is the smallest edge
greater than the value. Defaults to None, in which case
the edges are generated using ws_step, ws_min, and ws_max.
remove_all_nulls: (bool): Remove all null values in ws_cols (rather than any)
Returns:
pl.DataFrame: A new Polars DataFrame with an additional ws_bin column
"""
if edges is None:
edges = np.arange(ws_min, ws_max + ws_step, ws_step)
# Check if edges is a list or numpy array or similar
elif len(edges) < 2:
raise ValueError("edges must have length of at least 2")
df_with_mean_ws = add_ws(df_, ws_cols, remove_all_nulls)
# Filter to min and max
df_with_mean_ws = df_with_mean_ws.filter(
(pl.col("ws") >= ws_min) # Filter the mean wind speed
& (pl.col("ws") < ws_max)
)
return bin_column(df_with_mean_ws, "ws", "ws_bin", edges)
[docs]
def add_wd(df_: pl.DataFrame, wd_cols: List[str], remove_all_nulls: bool = False) -> pl.DataFrame:
"""Add the wd column to a dataframe, given which columns to average over.
Args:
df_ (pl.DataFrame): The Polars DataFrame containing the column to bin.
wd_cols (list(str)): The name of the columns to average across.
remove_all_nulls: (bool): Remove all null values in wd_cols (rather than any)
Returns:
pl.DataFrame: A new Polars DataFrame with an additional wd column
"""
# Gather up intermediate column names and final column names
wd_cols_cos = [c + "_cos" for c in wd_cols]
wd_cols_sin = [c + "_sin" for c in wd_cols]
cols_to_return = df_.columns
if "wd" not in cols_to_return:
cols_to_return = cols_to_return + ["wd"]
df_with_mean_wd = (
# df_.select(pl.exclude('wd_bin')) # In case wd_bin already exists
df_.filter(
pl.all_horizontal(pl.col(wd_cols).is_not_null())
if remove_all_nulls
else pl.any_horizontal(pl.col(wd_cols).is_not_null())
)
# Add the cosine columns
.with_columns(
[
pl.col(wd_cols).mul(np.pi / 180).cos().name.suffix("_cos"),
pl.col(wd_cols).mul(np.pi / 180).sin().name.suffix("_sin"),
]
)
)
df_with_mean_wd = (
df_with_mean_wd.with_columns(
[
# df_with_mean_wd.select(wd_cols_cos).mean(axis=1).alias('cos_mean'),
# df_with_mean_wd.select(wd_cols_sin).mean(axis=1).alias('sin_mean'),
pl.concat_list(wd_cols_cos).list.mean().alias("cos_mean"),
pl.concat_list(wd_cols_sin).list.mean().alias("sin_mean"),
]
)
.with_columns(
wd=np.mod(
pl.reduce(np.arctan2, [pl.col("sin_mean"), pl.col("cos_mean")]).mul(180 / np.pi),
360.0,
)
)
.filter((pl.col("wd").is_not_null()))
.select(cols_to_return) # Select for just the columns we want to return
)
return df_with_mean_wd
# (df_, wd_cols, wd_step=2.0, wd_min=0.0, wd_max=360.0, edges=None):@#
[docs]
def add_wd_bin(
df_: pl.DataFrame,
wd_cols: List[str],
wd_step: float = 2.0,
wd_min: float = 0.0,
wd_max: float = 360.0,
edges: Optional[Union[np.ndarray, list]] = None,
remove_all_nulls: bool = False,
):
"""Add the wd_bin column to a dataframe.
Given which columns to average over
and the step sizes to use
Args:
df_ (pl.DataFrame): The Polars DataFrame containing the column to bin.
wd_cols (list(str)): The name of the columns to average across.
wd_step (float): Step size for binning
wd_min (float): Minimum wind direction
wd_max (float): Maximum wind direction
edges (array-like): The edges of the bins. Values will be placed into the bin
whose left edge is the largest edge less than or equal to
the value, and whose right edge is the smallest edge
greater than the value. Defaults to None, in which case
the edges are generated using ws_step, ws_min, and ws_max.
remove_all_nulls: (bool): Remove all null values in wd_cols (rather than any)
Returns:
pl.DataFrame: A new Polars DataFrame with an additional ws_bin column
"""
if edges is None:
edges = np.arange(wd_min, wd_max + wd_step, wd_step)
# If not none, edges must have lenght of at least 2
elif len(edges) < 2:
raise ValueError("edges must have length of at least 2")
# Add in the mean wd column
df_with_mean_wd = add_wd(df_, wd_cols, remove_all_nulls)
# Filter to min and max
df_with_mean_wd = df_with_mean_wd.filter(
(pl.col("wd") >= wd_min) # Filter the mean wind speed
& (pl.col("wd") < wd_max)
)
return bin_column(df_with_mean_wd, "wd", "wd_bin", edges)
[docs]
def add_power_test(
df_: pl.DataFrame,
test_cols: List[str],
) -> pl.DataFrame:
"""Add the pow_test column to a dataframe, given which columns to average over.
Args:
df_ (pl.DataFrame): The Polars DataFrame containing the column to bin.
test_cols (list(str)): The name of the columns to average across.
Returns:
pl.DataFrame: A new Polars DataFrame with an additional pow_test column
"""
return df_.with_columns(pow_test=pl.concat_list(test_cols).list.mean())
[docs]
def add_power_ref(df_: pl.DataFrame, ref_cols: List[str]):
"""Add the pow_ref column to a dataframe, given which columns to average over.
Args:
df_ (pl.DataFrame): The Polars DataFrame containing the column to bin.
ref_cols (list(str)): The name of the columns to average across.
Returns:
pl.DataFrame: A new Polars DataFrame with an additional pow_ref column
"""
return df_.with_columns(pow_ref=pl.concat_list(ref_cols).list.mean())
[docs]
def add_reflected_rows(df_: pl.DataFrame, edges: Union[np.ndarray, list], overlap_distance: float):
"""Add reflected rows to a dataframe.
Adds rows to a dataframe with where the wind direction is
reflected around the nearest edge if within overlap_distance
Given a wind direction DataFrame `df_`, this function adds
reflected rows to the DataFrame such that each wind direction
in the original DataFrame has a corresponding reflected wind
direction. The reflected wind direction is calculated by
subtracting the wind direction from the nearest edge in `edges`
and then subtracting that difference again from the
original wind direction. The resulting wind direction
is then wrapped around to the range [0, 360) degrees. The function
returns a new DataFrame with the original rows and the added reflected rows.
This function enables overlapping bins in the energy ratio functions
Args:
df_ : polars.DataFrame
The DataFrame to add reflected rows to.
edges : numpy.ndarray
An array of wind direction edges to use for reflection.
(Should be same as used in energy ratio)
overlap_distance : float
The maximum distance between a wind direction and an edge
for the wind direction to be considered overlapping.
Returns:
polars.DataFrame
A new DataFrame with the original rows and the added reflected rows.
"""
df_add = df_.clone()
wd = df_add["wd"].to_numpy()
diff_matrix = wd[:, None] - edges
abs_diff_matrix = np.abs(diff_matrix)
idx = np.argmin(abs_diff_matrix, axis=1)
signed_mins = diff_matrix[np.arange(len(diff_matrix)), idx]
df_add = (
df_add.with_columns(pl.Series(name="distances", values=signed_mins, dtype=pl.Float32))
.filter(pl.col("distances").abs() < overlap_distance)
.with_columns(np.mod((pl.col("wd") - pl.col("distances") * 2), 360.0))
.drop("distances")
)
return pl.concat([df_, df_add])
[docs]
def filter_all_nulls(
df_: pl.DataFrame,
ref_cols: List[str],
test_cols: List[str],
ws_cols: List[str],
wd_cols: List[str],
):
"""Filter dataframe for ALL nulls.
Filter data by requiring ALL values of ref, test, ws, and wd to be valid
numbers.
Args:
df_ (pl.DataFrame): Polars dataframe possibly containing Null values
ref_cols (list[str]): A list of columns to use as the reference turbines
test_cols (list[str]): A list of columns to use as the test turbines
wd_cols (list[str]): A list of columns to derive the wind directions from
ws_cols (list[str]): A list of columns to derive the wind speeds from
Returns:
pl.DataFrame: A dataframe containing the energy ratio between the two sets of turbines.
"""
return df_.filter(
pl.all_horizontal(pl.col(ref_cols + test_cols + ws_cols + wd_cols).is_not_null())
)
[docs]
def filter_any_nulls(
df_: pl.DataFrame,
ref_cols: List[str],
test_cols: List[str],
ws_cols: List[str],
wd_cols: List[str],
):
"""Filter dataframe for ANY nulls.
Filter data by requiring ANY of ref, ANY of test, ANY of ws, and ANY of wd
to be a valid number.
Args:
df_ (pl.DataFrame): Polars dataframe possibly containing Null values
ref_cols (list[str]): A list of columns to use as the reference turbines
test_cols (list[str]): A list of columns to use as the test turbines
wd_cols (list[str]): A list of columns to derive the wind directions from
ws_cols (list[str]): A list of columns to derive the wind speeds from
Returns:
pl.DataFrame: A dataframe containing the energy ratio between the two sets of turbines.
"""
return (
df_.filter(pl.any_horizontal(pl.col(ref_cols).is_not_null()))
.filter(pl.any_horizontal(pl.col(test_cols).is_not_null()))
.filter(pl.any_horizontal(pl.col(ws_cols).is_not_null()))
.filter(pl.any_horizontal(pl.col(wd_cols).is_not_null()))
)
[docs]
def bin_and_group_dataframe(
df_: pl.DataFrame,
ref_cols: List,
test_cols: List,
wd_cols: List,
ws_cols: List,
wd_step: float = 2.0,
wd_min: float = 0.0,
wd_max: float = 360.0,
ws_step: float = 1.0,
ws_min: float = 0.0,
ws_max: float = 50.0,
wd_bin_overlap_radius: float = 0.0,
remove_all_nulls: bool = False,
bin_cols_without_df_name: List = None,
num_df: int = 0,
):
"""Bin and aggregate a DataFrame based on wind direction and wind speed parameters.
This function takes a Polars DataFrame (df_) and performs
binning and aggregation operations based on
wind direction (wd) and wind speed (ws). It allows for optional
handling of reflected rows and grouping by
specific columns. The resulting DataFrame contains aggregated
statistics for reference and test power
columns within specified bins.
Args:
df_ (DataFrame): The input Polars DataFrame to be processed.
ref_cols (List[str]): List of columns containing reference power data.
test_cols (List[str]): List of columns containing test power data.
wd_cols (List[str]): List of columns containing wind direction data.
ws_cols (List[str]): List of columns containing wind speed data.
wd_step (float, optional): Step size for wind direction binning. Defaults to 2.0.
wd_min (float, optional): Minimum wind direction value. Defaults to 0.0.
wd_max (float, optional): Maximum wind direction value. Defaults to 360.0.
ws_step (float, optional): Step size for wind speed binning. Defaults to 1.0.
ws_min (float, optional): Minimum wind speed value. Defaults to 0.0.
ws_max (float, optional): Maximum wind speed value. Defaults to 50.0.
wd_bin_overlap_radius (float, optional): Radius for overlapping wind direction bins.
Defaults to 0.0.
remove_all_nulls (bool, optional): If True, remove rows unless all valid instead of any.
Defaults to False.
bin_cols_without_df_name (List[str], optional): List of columns used
for grouping without 'df_name'.
num_df (int, optional): Number of dataframes required for each bin combination.
Returns:
DataFrame: The resulting Polars DataFrame with aggregated statistics.
"""
# If wd_bin_overlap_radius is not zero, add reflected rows
if wd_bin_overlap_radius > 0.0:
# Need to obtain the wd column now rather than during binning
df_ = add_wd(df_, wd_cols, remove_all_nulls)
# Add reflected rows
edges = np.arange(wd_min, wd_max + wd_step, wd_step)
df_ = add_reflected_rows(df_, edges, wd_bin_overlap_radius)
# Assign the wd/ws bins
df_ = add_ws_bin(df_, ws_cols, ws_step, ws_min, ws_max, remove_all_nulls=remove_all_nulls)
df_ = add_wd_bin(df_, wd_cols, wd_step, wd_min, wd_max, remove_all_nulls=remove_all_nulls)
# Assign the reference and test power columns
df_ = add_power_ref(df_, ref_cols)
df_ = add_power_test(df_, test_cols)
bin_cols_with_df_name = bin_cols_without_df_name + ["df_name"]
# Group df_
df_ = (
df_.filter(
pl.all_horizontal(pl.col(bin_cols_with_df_name).is_not_null())
) # Select for all bin cols present
.group_by(bin_cols_with_df_name, maintain_order=True)
.agg([pl.mean("pow_ref"), pl.mean("pow_test"), pl.len().alias("count")])
# Enforce that each ws/wd bin combination has to appear in all dataframes
.filter(pl.len().over(bin_cols_without_df_name) == num_df)
)
return df_
[docs]
def add_bin_weights(
df_: pl.DataFrame,
df_freq_pl: pl.DataFrame = None,
bin_cols_without_df_name: List = None,
weight_by: str = "min",
):
"""Add weights to DataFrame bins.
Add weights to DataFrame bins based on either frequency counts or
the provided frequency table df_freq_pl.
This function assigns weights to DataFrame bins. If 'df_freq_pl' is provided,
these weights are used directly. If 'df_freq_pl' is not provided, the function
calculates the weights from the input DataFrame 'df_'.
Weights can be determined as either the minimum ('min') or the sum ('sum') of counts.
Args:
df_ (DataFrame): The input Polars DataFrame containing bins and frequency counts.
df_freq_pl (DataFrame, optional): A Polars DataFrame containing frequency counts for bins.
If not provided, the function will calculate these counts from 'df_'.
bin_cols_without_df_name (List, optional): List of columns used for grouping
bins without 'df_name'.
weight_by (str, optional): Weight calculation method, either 'min'
(minimum count) or 'sum' (sum of counts).
Defaults to 'min'.
Returns:
Tuple[pl.DataFrame, pl.DataFrame]: A tuple containing the modified DataFrame 'df_'
with added weights and the DataFrame
'df_freq_pl' with the calculated frequency counts.
Raises:
RuntimeError: If none of the ws/wd bins in data appear in df_freq.
UserWarning: If some bins in data are not in df_freq and will receive a weight of 0.
"""
if df_freq_pl is None:
# Determine the weights per bin as either the min or sum count
df_freq_pl = (
df_.select(bin_cols_without_df_name + ["count"])
.group_by(bin_cols_without_df_name)
.agg([pl.min("count") if weight_by == "min" else pl.sum("count")])
.rename({"count": "weight"})
)
df_ = df_.join(df_freq_pl, on=["wd_bin", "ws_bin"], how="left").with_columns(pl.col("weight"))
# Check if all the values in the weight column are null
if df_["weight"].is_null().all():
raise RuntimeError("None of the ws/wd bins in data appear in df_freq")
# Check if any of the values in the weight column are null
if df_["weight"].is_null().any():
warnings.warn("Some bins in data are not in df_freq and will get 0 weight")
# Fill the null values with zeros
df_ = df_.with_columns(pl.col("weight").fill_null(strategy="zero"))
# Normalize the weights
df_ = df_.with_columns(pl.col("weight").truediv(pl.col("weight").sum()))
return df_, df_freq_pl