Source code for reVX.rpm.rpm_clusters

# -*- coding: utf-8 -*-
"""
RPM Clustering Module
"""
from copy import deepcopy
import geopandas as gpd
import logging
import numpy as np
import pywt
from scipy.spatial import cKDTree
from shapely.geometry import Point

from reVX.handlers.outputs import Outputs
from reVX.utilities.cluster_methods import ClusteringMethods

logger = logging.getLogger(__name__)


[docs]class RPMClusters: """ Base class for RPM clusters Examples -------- >>> from reV import Resource >>> >>> fname = '$TESTDATADIR/reV_gen/gen_pv_2012.h5' >>> with Resource(fname) as res: >>> gen_gids = f.meta.index.values >>> >>> clusters = RPMClusters(fname, gen_gids, n_clusters=6) >>> clusters._cluster(**kwargs) >>> clusters.meta gen_gid latitude longitude cluster_id geometry 0 0 41.290001 -71.860001 0 POINT (-71.86000 41.29000) 1 1 41.290001 -71.820000 0 POINT (-71.82000 41.29000) 2 2 41.250000 -71.820000 4 POINT (-71.82000 41.25000) 3 3 41.330002 -71.820000 0 POINT (-71.82000 41.33000) 4 4 41.369999 -71.820000 0 POINT (-71.82000 41.37000) .. ... ... ... ... ... 95 95 41.250000 -71.660004 4 POINT (-71.66000 41.25000) 96 96 41.889999 -71.660004 5 POINT (-71.66000 41.89000) 97 97 41.450001 -71.660004 3 POINT (-71.66000 41.45000) 98 98 41.610001 -71.660004 1 POINT (-71.66000 41.61000) 99 99 41.410000 -71.660004 3 POINT (-71.66000 41.41000) Generate Shape File of Cluster >>> RPMClusters.generate_shapefile(clusters.meta, fpath='./test.shp') """ def __init__(self, cf_fpath, gen_gids, n_clusters, region=None): """ Parameters ---------- cf_fpath : str Path to reV .h5 files containing desired capacity factor profiles gen_gids : list | ndarray List or vector of gen_gids to cluster on n_clusters : int Number of clusters to identify region : str | None Optional region identifier that you are clustering on for better debugging. """ self._region = region self._meta, self._coefficients = self._parse_data(cf_fpath, gen_gids) self._n_clusters = n_clusters msg = ('Number of clusters for "{}" is zero! Needs to be >= 1' .format(self._region)) assert self._n_clusters > 0, msg @property def coefficients(self): """ Returns ------- _coefficients : ndarray Array of wavelet coefficients for each gen_gid """ return self._coefficients @property def meta(self): """ Returns ------- _meta : pandas.DataFrame DataFrame of meta data: - gen_gid - latitude - longitude - cluster_id - rank """ return self._meta @property def n_clusters(self): """ Returns ------- _n_clusters : int Number of clusters """ return self._n_clusters @property def cluster_coefficients(self): """ Returns ------- cluster_coeffs : ndarray Representative coefficients for each cluster """ cluster_coeffs = None if 'cluster_id' in self._meta: cluster_coeffs = [] for _, cdf in self._meta.groupby('cluster_id'): idx = cdf.index.values cluster_coeffs.append(self.coefficients[idx].mean(axis=0)) cluster_coeffs = np.array(cluster_coeffs) return cluster_coeffs @property def cluster_ids(self): """ Returns ------- cluster_ids : ndarray Cluster cluster_id for each gen_gid """ cluster_ids = None if 'cluster_id' in self._meta: cluster_ids = self._meta['cluster_id'].values return cluster_ids @property def cluster_coordinates(self): """ Returns ------- cluster_coords : ndarray lon, lat coordinates of the centroid of each cluster """ cluster_coords = None if 'cluster_id' in self._meta: cluster_coords = self._meta.groupby('cluster_id') cluster_coords = cluster_coords[['longitude', 'latitude']].mean() cluster_coords = cluster_coords.values return cluster_coords @property def coordinates(self): """ Returns ------- coords : ndarray lon, lat coordinates for each gen_gid """ coords = self._meta[['longitude', 'latitude']].values return coords @classmethod def _parse_data(cls, cf_fpath, gen_gids): """ Extract lat, lon coordinates for given gen_gids Extract and convert cf_profiles into wavelet coefficients Parameters ---------- cf_fpath : str Path to reV .h5 files containing desired capacity factor profiles gen_gids : list | ndarray List or vector of gen_gids to cluster on """ with Outputs(cf_fpath, mode='r', unscale=False) as cfs: meta = cfs.meta.loc[gen_gids, ['latitude', 'longitude']] gid_slice, gid_idx = cls._gid_pos(gen_gids) coeff = cfs['cf_profile', :, gid_slice][:, gid_idx] meta['gen_gid'] = gen_gids cols = ['gen_gid', 'latitude', 'longitude'] meta = meta[cols].reset_index(drop=True) coeff = cls._calculate_wavelets(coeff.T) return meta, coeff @staticmethod def _gid_pos(gen_gids): """ Parameters ---------- gen_gids : list | ndarray List or vector of gen_gids to cluster on Returns ------- gid_slice : slice Slice that encompasses the entire gen_gid range gid_idx : ndarray Adjusted list to extract gen_gids of interest from slice """ if isinstance(gen_gids, list): gen_gids = np.array(gen_gids) s = gen_gids.min() e = gen_gids.max() + 1 gid_slice = slice(s, e, None) gid_idx = gen_gids - s return gid_slice, gid_idx @staticmethod def _calculate_wavelets(ts_arrays): """ Calculates the wavelet coefficients of each timeseries within ndarray """ coefficients = RPMWavelets.get_dwt_coefficients(ts_arrays) return coefficients def _cluster_coefficients(self, method="kmeans", **kwargs): """ Apply a clustering method to <self.ts_arrays> """ logger.debug('Applying {} clustering '.format(method)) c_func = getattr(ClusteringMethods, method) labels = c_func(self.coefficients, n_clusters=self.n_clusters, **kwargs) return labels def _dist_rank_optimization(self, norm=None): """ Re-cluster data by minimizing the sum of the: - distance between each point and each cluster centroid - distance between each point and each Parameters ---------- norm : str Normalization method to use (see sklearn.preprocessing.normalize) if None range normalize Returns ------- new_labels : ndarray New cluster labels """ cluster_coeffs = self.cluster_coefficients cluster_centroids = self.cluster_coordinates rmse = [] dist = [] for centroid, rep_coeffs in zip(cluster_centroids, cluster_coeffs): c_rmse = np.mean((self.coefficients - rep_coeffs) ** 2, axis=1) ** 0.5 rmse.append(c_rmse) c_dist = np.linalg.norm(self.coordinates - centroid, axis=1) dist.append(c_dist) rmse = ClusteringMethods._normalize_values(np.array(rmse), norm=norm) dist = ClusteringMethods._normalize_values(np.array(dist), norm=norm) err = (dist**2 + rmse**2) new_labels = np.argmin(err, axis=0) return new_labels def _dist_rank_filter(self, iterate=True, norm=None): """ Re-cluster data by minimizing the sum of the: - distance between each point and each cluster centroid Parameters ---------- iterate : bool Iterate on _dist_rank_optimization until cluster centroids and profiles start to converge norm : str Normalization method to use (see sklearn.preprocessing.normalize) if None range normalize Returns ------- new_labels : ndarray New cluster labels """ clusters = deepcopy(self) coeffs = clusters.cluster_coefficients centroids = clusters.cluster_coordinates dist, rmse = 0, 0 while True: new_labels = clusters._dist_rank_optimization(norm=norm) clusters._meta['cluster_id'] = new_labels if iterate: c_coeffs = clusters.cluster_coefficients c_centroids = clusters.cluster_coordinates dist_i = np.linalg.norm(c_centroids - centroids) rmse_i = np.mean((c_coeffs - coeffs) ** 2) ** 0.5 if (dist_i <= dist and rmse_i <= rmse): break else: dist, rmse = dist_i, rmse_i coeffs, centroids = c_coeffs, c_centroids else: break return new_labels @staticmethod def _get_cluster_geom(gdf_points): """ Generate cluster polygons as a geopandas dataframe """ lookup = gdf_points[['latitude', 'longitude']].values tree = cKDTree(lookup) # pylint: disable=not-callable dists, _ = tree.query(lookup, k=2) mean_dist = dists.T[1].mean() gdf_poly = gdf_points.copy() gdf_poly.geometry = gdf_poly.geometry.buffer(mean_dist) clusters = gdf_poly.dissolve(by='cluster_id').reset_index() clusters.geometry = clusters.geometry.buffer(-mean_dist / 2) return clusters, mean_dist
[docs] @classmethod def generate_shapefile(cls, meta, fpath, beautify=True, source_crs="EPSG:4326", target_crs=None): """ Generate cluster polygons and save to shapefile """ geometry = [Point(xy) for xy in zip(meta.longitude, meta.latitude)] gdf_points = gpd.GeoDataFrame(meta, geometry=geometry, crs=source_crs) if target_crs is not None and source_crs != target_crs: gdf_points = gdf_points.to_crs(target_crs) clusters, mean_dist = cls._get_cluster_geom(gdf_points) if beautify: clusters.geometry = clusters.geometry.buffer(-mean_dist) clusters.geometry = clusters.geometry.buffer(mean_dist) for index, _ in clusters.iterrows(): geom = clusters.loc[index, 'geometry'] if geom.geom_type == 'MultiPolygon': clusters.loc[index, 'geometry'] = max(geom, key=lambda a: a.area) clusters[['cluster_id', 'geometry']].to_file(fpath) return fpath
def _contiguous_filter(self, drop_islands=True, buffer_weight=2, source_crs="EPSG:4326"): """ Re-classify clusters by making contigous cluster polygons """ meta = self._meta geometry = [Point(xy) for xy in zip(meta.longitude, meta.latitude)] gdf_points = gpd.GeoDataFrame(meta, geometry=geometry, crs=source_crs) clusters, mean_dist = self._get_cluster_geom(gdf_points) # Drop Islands if drop_islands: buffer = buffer_weight * mean_dist clusters.geometry = clusters.geometry.buffer(-buffer) clusters.geometry = clusters.geometry.buffer(buffer) for index, _ in clusters.iterrows(): geom = clusters.loc[index, 'geometry'] if geom.geom_type == 'MultiPolygon': clusters.loc[index, 'geometry'] = max(geom, key=lambda a: a.area) intersected = gpd.sjoin(gdf_points, clusters, how="left", op='intersects') # drop duplicate rows gid_counts = intersected.groupby('gen_gid_left').size() duplicate_gids = gid_counts[gid_counts > 1].index mask = intersected['gen_gid_left'].isin(duplicate_gids) intersected.loc[mask, 'cluster_id_right'] = None intersected = intersected.drop_duplicates(subset=['gen_gid_left']) mask = np.isnan(intersected.cluster_id_right) assigned = intersected[~mask].reset_index() unassigned = intersected[mask] lookup = assigned[['latitude_left', 'longitude_left']].values target = unassigned[['latitude_left', 'longitude_left']].values tree = cKDTree(lookup) # pylint: disable=not-callable _, inds = tree.query(target, k=1) for i, ind in enumerate(list(unassigned.index)): nearest_cluster_id = assigned.loc[inds[i], 'cluster_id_left'] intersected.loc[ind, 'cluster_id_left'] = nearest_cluster_id new_labels = intersected['cluster_id_left'] return new_labels def _calculate_ranks(self): """ Determine the rank of each location within all clusters based on the mean square errors """ cluster_coeffs = self.cluster_coefficients for i, cdf in self.meta.groupby('cluster_id'): pos = cdf.index rep_coeffs = cluster_coeffs[i] coeffs = self.coefficients[pos] err = np.mean((coeffs - rep_coeffs) ** 2, axis=1) ** 0.5 rank = np.argsort(err) self._meta.loc[pos, 'rank'] = rank def _cluster(self, method='kmeans', method_kwargs=None, dist_rank_filter=True, dist_rmse_kwargs=None, contiguous_filter=True, contiguous_kwargs=None): """ Run three step RPM clustering procedure: 1) Cluster on wavelet coefficients 2) Clean up clusters by optimizing rmse and distance 3) Remove islands using polygon intersection Parameters ---------- method : str Method to use to cluster coefficients method_kwargs : dict Kwargs for running _cluster_coefficients dist_rank_filter : bool Run _optimize_dist_rank dist_rmse_kwargs : dict Kwargs for running _dist_rank_optimization contiguous_filter : bool Run _contiguous_filter contiguous_kwargs : dict Kwargs for _contiguous_filter """ if self.n_clusters <= 1: dist_rank_filter = False contiguous_filter = False if method_kwargs is None: method_kwargs = {} labels = self._cluster_coefficients(method=method, **method_kwargs) self._meta['cluster_id'] = labels # Optimize Distance & Rank if dist_rank_filter is True: if dist_rmse_kwargs is None: dist_rmse_kwargs = {} new_labels = self._dist_rank_filter(**dist_rmse_kwargs) self._meta['cluster_id'] = new_labels # Apply contiguous filter if contiguous_filter is True: if contiguous_kwargs is None: contiguous_kwargs = {} new_labels = self._contiguous_filter(**contiguous_kwargs) self._meta['cluster_id'] = new_labels self._calculate_ranks()
[docs] @classmethod def cluster(cls, cf_h5_path, region_gen_gids, n_clusters, method='kmeans', method_kwargs=None, dist_rank_filter=True, dist_rmse_kwargs=None, contiguous_filter=True, contiguous_kwargs=None, region=None): """ Entry point for RPMCluster to get clusters for a given region defined as a list | array of gen_gids Parameters ---------- cf_h5_path : str Path to reV .h5 files containing desired capacity factor profiles region_gen_gids : list | ndarray List or vector of gen_gids to cluster on n_clusters : int Number of clusters to identify method : str Method to use to cluster coefficients method_kwargs : dict Kwargs for running _cluster_coefficients dist_rank_filter : bool Re-cluster data by minimizing the sum of the: - distance between each point and each cluster centroid dist_rmse_kwargs : dict Kwargs for running _dist_rank_optimization contiguous_filter : bool Re-classify clusters by making contigous cluster polygons contiguous_kwargs : dict Kwargs for _contiguous_filter region : str | None Optional region identifier that you are clustering on for better debugging. Returns ------- out : pandas.DataFrame Cluster results: (gen_gid, lon, lat, cluster_id, rank) Examples -------- >>> from reV import Resource >>> >>> fname = '$TESTDATADIR/reV_ge/gen_pv_2012.h5' >>> with Resource(fname) as res: >>> gen_gids = f.meta.index.values >>> >>> RPMClusters.cluster(fname, gen_gids, n_clusters=6) gen_gid latitude longitude cluster_id geometry 0 0 41.290001 -71.860001 0 POINT (-71.86000 41.29000) 1 1 41.290001 -71.820000 0 POINT (-71.82000 41.29000) 2 2 41.250000 -71.820000 4 POINT (-71.82000 41.25000) 3 3 41.330002 -71.820000 0 POINT (-71.82000 41.33000) 4 4 41.369999 -71.820000 0 POINT (-71.82000 41.37000) .. ... ... ... ... ... 95 95 41.250000 -71.660004 4 POINT (-71.66000 41.25000) 96 96 41.889999 -71.660004 5 POINT (-71.66000 41.89000) 97 97 41.450001 -71.660004 3 POINT (-71.66000 41.45000) 98 98 41.610001 -71.660004 1 POINT (-71.66000 41.61000) 99 99 41.410000 -71.660004 3 POINT (-71.66000 41.41000) """ clusters = cls(cf_h5_path, region_gen_gids, n_clusters, region=region) try: clusters._cluster(method=method, method_kwargs=method_kwargs, dist_rank_filter=dist_rank_filter, dist_rmse_kwargs=dist_rmse_kwargs, contiguous_filter=contiguous_filter, contiguous_kwargs=contiguous_kwargs) except Exception as e: logger.exception('Clustering failed on region "{}" with ' 'gen_gids {} through {}, error message: "{}"' .format(region, np.min(region_gen_gids), np.max(region_gen_gids), e)) return clusters.meta
[docs]class RPMWavelets: """Base class for RPM wavelets"""
[docs] @classmethod def get_dwt_coefficients(cls, x, wavelet='Haar', level=None, indices=None): """ Collect wavelet coefficients for time series <x> using mother wavelet <wavelet> at levels <level>. Parameters ---------- x : ndarray time series values wavelet : string mother wavelet type level : int optional wavelet computation level indices : ndarray coefficient array levels to keep Returns ------- list stacked coefficients at <indices> """ # set mother _wavelet = pywt.Wavelet(wavelet) # multi-level with default depth logger.debug('Calculating wavelet coefficients' ' with {w} wavelet'.format(w=_wavelet.family_name)) _wavedec = pywt.wavedec(data=x, wavelet=_wavelet, axis=1, level=level) return cls._subset_coefficients(x=_wavedec, gid_count=x.shape[0], indices=indices)
@staticmethod def _subset_coefficients(x, gid_count, indices=None): """ Subset and stack wavelet coefficients Parameters ---------- x : ndarray coefficients arrays gid_count : int number of area ID values indices : ndarray coefficient array levels to keep Returns ------- ndarray stacked coefficients: converted to integers """ indices = indices or range(0, len(x)) _coefficient_count = 0 for _index in indices: _shape = x[_index].shape _coefficient_count += _shape[1] _combined_wc = np.empty(shape=(gid_count, _coefficient_count), dtype=np.int32) logger.debug('{c:d} coefficients'.format(c=_coefficient_count)) _i_start = 0 for _index in indices: _i_end = _i_start + x[_index].shape[1] _combined_wc[:, _i_start:_i_end] = np.round(x[_index]) _i_start = _i_end return _combined_wc