Source code for flasc.flasc_dataframe

"""FLASC DataFrame module."""

from __future__ import annotations

import pandas as pd
from pandas import DataFrame
from wind_up.constants import (
    DataColumns,
    RAW_DOWNTIME_S_COL,
    RAW_POWER_COL,
    RAW_WINDSPEED_COL,
    RAW_YAWDIR_COL,
    TIMESTAMP_COL,
)


# Create a new DataFrame subclass
[docs] class FlascDataFrame(DataFrame): """Subclass of pandas.DataFrame for working with FLASC data. Stores data in preferred Flasc format, or user format, with option to convert between the two. Want handling to go between long and wide. """ # Attributes to pickle must be in this list _metadata = [ "channel_name_map", "_user_format", "_long_data_columns", ] def __init__(self, *args, channel_name_map=None, long_data_columns=None, **kwargs): """Initialize the FlascDataFrame class, a subclass of pandas.DataFrame. Args: *args: arguments to pass to the DataFrame constructor channel_name_map (dict): Dictionary of column names to map from the user format to the FLASC format, where the key string is the user format and the value string is the FLASC equivalent. Defaults to None. long_data_columns (dict): Dictionary of column names for long format data. Defaults to {"variable_column": "variable", "value_column": "value"}. If not provided, user data format assumed to be wide. **kwargs: keyword arguments to pass to the DataFrame constructor """ super().__init__(*args, **kwargs) # check that name_map dictionary is valid if channel_name_map is not None: if not isinstance(channel_name_map, dict): raise ValueError("channel_name_map must be a dictionary") if not all( isinstance(k, str) and isinstance(v, str) for k, v in channel_name_map.items() ): raise ValueError("channel_name_map must be a dictionary of strings") self.channel_name_map = channel_name_map # Determine the user format if long_data_columns is None: self._user_format = "wide" self._long_data_columns = None else: self._user_format = "long" # Confirm the long_data_columns is a dictionary with the correct keys if not isinstance(long_data_columns, dict): raise ValueError("long_data_columns must be a dictionary") if not all(col in long_data_columns for col in ["variable_column", "value_column"]): raise ValueError( "long_data_columns must contain keys 'variable_column', " "and 'value_column'" ) self._long_data_columns = long_data_columns @property def in_flasc_format(self): """Return True if the data is in FLASC format, False otherwise.""" if ("time" in self.columns) and ("pow_000" in self.columns): return True else: return False @property def _constructor(self): return FlascDataFrame def __str__(self): """Printout when calling print(df).""" if self.in_flasc_format: return "FlascDataFrame in FLASC format\n" + super().__str__() else: return f"FlascDataFrame in user ({self._user_format}) format\n" + super().__str__()
[docs] def _repr_html_(self): """Printout when displaying results in jupyter notebook.""" if self.in_flasc_format: return "FlascDataFrame in FLASC format\n" + super()._repr_html_() else: return f"FlascDataFrame in user ({self._user_format}) format\n" + super()._repr_html_()
@property def n_turbines(self): """Return the number of turbines in the dataset.""" self.check_flasc_format() # Count how many columns in df_columns are of the form 'pow_###' return sum(len(c) == 7 and c[:4] == "pow_" and c[4:].isdigit() for c in self.columns)
[docs] def check_flasc_format(self): """Raise an error if the data is not in FLASC format.""" if not self.in_flasc_format: raise ValueError( ( "Data must be in FLASC format to perform this operation." "Call df.convert_to_flasc_format() to convert the data to FLASC format." ) ) else: pass
[docs] def copy_metadata(self, other): """Copy metadata from another FlascDataFrame to self. Args: other (FlascDataFrame): DataFrame to copy metadata from. """ for attr in self._metadata: setattr(self, attr, getattr(other, attr))
[docs] def convert_to_user_format(self, inplace=False): """Convert the DataFrame to the format that the user expects, given the channel_name_map. Args: inplace (bool): If True, modify the DataFrame in place. If False, return a new DataFrame. Returns: FlascDataFrame: FlascDataFrame in user format if inplace is False, None otherwise. """ # Check if already in user format if not self.in_flasc_format: if inplace: return else: return self.copy() # Make a copy of self df_user = self.copy() # Rename the channel columns to user-specified names if self.channel_name_map is not None: df_user.rename(columns={v: k for k, v in self.channel_name_map.items()}, inplace=True) # Convert the format to long if _user_format is long if self._user_format == "long": df_user = self._convert_wide_to_long(df_user) # Assign to self or return if inplace: self.__init__( df_user, channel_name_map=self.channel_name_map, long_data_columns=self._long_data_columns, ) else: return df_user
[docs] def convert_time_to_datetime(self, inplace=False): """Convert the time column to a datetime representation. Args: inplace (bool): If True, modify the DataFrame in place. If False, return a new DataFrame. Returns: FlascDataFrame: FlascDataFrame with time column as datetime object if inplace is False, None otherwise """ if "time" not in self.columns: raise KeyError("Column 'time' must be present in the DataFrame") if inplace: self["time"] = pd.to_datetime(self["time"]) else: df = self.copy() df["time"] = pd.to_datetime(df["time"]) return df
[docs] def convert_to_flasc_format(self, inplace=False): """Convert the DataFrame to the format that FLASC expects. Args: inplace (bool): If True, modify the DataFrame in place. If False, return a new DataFrame. Returns: FlascDataFrame: FlascDataFrame in FLASC format if inplace is False, None otherwise # TODO: could consider converting "time" to datetime type here. If so, will want to keep # the original "time" column for back-conversion if needed. # Similarly, we could sort on time, but perhaps both are too meddlesome """ # Check if already in flasc format if self.in_flasc_format: if inplace: return else: return self.copy() # Make a copy of self df_flasc = self.copy() # Convert back from long if necessary if self._user_format == "long": df_flasc = self._convert_long_to_wide(df_flasc) # Rename the channel columns to flasc-naming convention if self.channel_name_map is not None: df_flasc.rename(columns=self.channel_name_map, inplace=True) # Assign to self or return if inplace: self.__init__( df_flasc, channel_name_map=self.channel_name_map, long_data_columns=self._long_data_columns, ) else: return df_flasc
[docs] def _convert_long_to_wide(self, df_): """Convert a long format DataFrame to a wide format DataFrame. Args: df_ (FlascDataFrame): Long format FlascDataFrame Returns: FlascDataFrame: Wide format FlascDataFrame """ # Pivot the table so the variable column becomes the column names with time # kept as the first column and value as the values df_ = df_.pivot( index="time", columns=self._long_data_columns["variable_column"], values=self._long_data_columns["value_column"], ).reset_index() # Remove the name df_.columns.name = None # Reset the index to make the time column a regular column return FlascDataFrame( df_, channel_name_map=self.channel_name_map, long_data_columns=self._long_data_columns, )
[docs] def _convert_wide_to_long(self, df_): """Convert a wide format DataFrame to a long format DataFrame. Args: df_ (FlascDataFrame): Wide format FlascDataFrame Returns: FlascDataFrame: Long format FlascDataFrame """ df_ = df_.melt( id_vars="time", var_name=self._long_data_columns["variable_column"], value_name=self._long_data_columns["value_column"], ).sort_values(["time", self._long_data_columns["variable_column"]]) # Reset index for cleanliness df_ = df_.reset_index(drop=True) return FlascDataFrame( df_, channel_name_map=self.channel_name_map, long_data_columns=self._long_data_columns, )
[docs] def to_feather(self, path, **kwargs): """Raise warning about lost information and save to feather format.""" print( "Dataframe will be saved as a pandas DataFrame. " "Extra attributes from FlascDataFrame will be lost. " "We recommend using df.to_pickle() and pd.read_pickle() instead, " "as this will retain FlascDataFrame attributes." ) return super().to_feather(path, **kwargs)
[docs] def export_to_windup_format( self, turbine_names: list[str] | None = None, time_col: str = "time", power_col: str = "pow", windspeed_col: str = "ws", winddirection_col: str = "wd", normal_operation_col: str | None = None, pitchangle_col: str | None = None, genrpm_col: str | None = None, downtimecounter_col: str | None = None, turbine_num_digits: int = 3, ): """Convert the DataFrame to the format that wind-up expects.""" # figure out how many turbines there are from columns nt = sum( [ 1 for col in self.columns if col.startswith(f"{power_col}_") and col[-turbine_num_digits:].isdigit() ] ) # if turbine_names provided check it matches if turbine_names is not None: if not len(turbine_names) == nt: msg = ( f"Number of names in turbine_names, {len(turbine_names)}, " f"does not match number of turbines in SCADA data, {nt}." ) raise ValueError(msg) # build a new dataframe one turbine at a time turbine_num_format = f"0{turbine_num_digits}d" scada_df = pd.DataFrame() for i in range(nt): wtg_cols = [col for col in self.columns if col.endswith(f"_{i:{turbine_num_format}}")] wtg_df = pd.DataFrame(self[[time_col, *wtg_cols]]).__finalize__(None) wtg_df.columns = [time_col, *[x[: -(turbine_num_digits + 1)] for x in wtg_cols]] wtg_df[DataColumns.turbine_name] = ( turbine_names[i] if turbine_names is not None else f"{i:{turbine_num_format}}" ) scada_df = pd.concat([scada_df, wtg_df]) scada_df = scada_df.set_index(time_col) scada_df.index.name = ( TIMESTAMP_COL # assumption is that flasc timestamps are UTC start format ) scada_df = scada_df.rename( columns={ power_col: RAW_POWER_COL, # DataColumns.active_power_mean, windspeed_col: RAW_WINDSPEED_COL, # DataColumns.wind_speed_mean, winddirection_col: RAW_YAWDIR_COL, # DataColumns.yaw_angle_mean, } ) if pitchangle_col is None: scada_df[DataColumns.pitch_angle_mean] = 0 else: scada_df = scada_df.rename(columns={pitchangle_col: DataColumns.pitch_angle_mean}) if genrpm_col is None: scada_df[DataColumns.gen_rpm_mean] = 1000 else: scada_df = scada_df.rename(columns={genrpm_col: DataColumns.gen_rpm_mean}) if downtimecounter_col is None: scada_df[RAW_DOWNTIME_S_COL] = 0 else: scada_df = scada_df.rename(columns={downtimecounter_col: DataColumns.shutdown_duration}) scada_df[DataColumns.active_power_mean] = scada_df[RAW_POWER_COL] scada_df[DataColumns.wind_speed_mean] = scada_df[RAW_WINDSPEED_COL] scada_df[DataColumns.yaw_angle_mean] = scada_df[RAW_YAWDIR_COL] scada_df[DataColumns.shutdown_duration] = scada_df[RAW_DOWNTIME_S_COL] if normal_operation_col is not None: cols_to_filter = [ col for col in scada_df.columns if col != normal_operation_col and "raw_" not in col and col != DataColumns.turbine_name ] scada_df.loc[~scada_df[normal_operation_col].isin([True]), cols_to_filter] = pd.NA return scada_df