"""
Region Classifier Module
"""
import logging
import warnings
import geopandas as gpd
import pandas as pd
from scipy.spatial import cKDTree
from shapely.geometry import Point
from shapely.geometry import shape
from reVX.utilities.utilities import log_versions
from rex import Resource
logger = logging.getLogger(__name__)
[docs]class RegionClassifier():
"""
Base class of region classification
Examples
--------
>>> meta_path = 'meta.csv'
>>> regions_path = 'us_states.shp'
>>> regions_label = 'NAME'
>>>
>>> classifier = RegionClassifier(meta_path=meta_path,
regions_path=regions_path,
regions_label=regions_label)
>>>
>>> force = True
>>> fout = 'new_meta.csv'
>>>
>>> classification = classifier.classify(force=force)
>>> classifier.output_to_csv(classification, fout)
"""
CRS = "EPSG:4326"
DEFAULT_REGIONS_LABEL = 'regions_index'
def __init__(self, meta_path, regions, regions_label=None,
outlier_value=-999):
"""
Parameters
----------
meta_path : str | pandas.DataFrame
Path to meta CSV file, resource .h5 file, or pre-loaded meta
DataFrame containing lat/lon points
regions : str | GeoDataFrame
Path to regions shapefile containing labeled geometries or
a pre-loaded GeoDataFrame
regions_label : str
Attribute to use as label in the regions shapefile
outlier_value : float | int | str
Value to assign to outliers if not force
"""
log_versions(logger)
self._regions_label = regions_label
if self._regions_label is None:
self._regions_label = self.DEFAULT_REGIONS_LABEL
self._meta = self._get_meta(meta_path)
self._regions = self._get_regions(regions, self._regions_label)
self._outlier_value = outlier_value
@property
def regions(self):
"""Get the regions GeoDataFrame
Returns
-------
GeoDataFrame
"""
return self._regions
[docs] @staticmethod
def output_to_csv(gdf, path):
""" Export a geopandas dataframe to csv
Parameters
----------
gdf : GeoPandas DataFrame
Meta data to export
path : str
Output CSV file path for labeled meta CSV file
"""
output_gdf = gdf.drop('geometry', axis=1)
if output_gdf.index.name == 'gid':
output_gdf = output_gdf.reset_index()
output_gdf.to_csv(path, index=False)
@classmethod
def _get_regions(cls, regions, regions_label):
""" Load the regions shapefile into geopandas dataframe
Parameters
----------
regions : str | GeoDataFrame
Path to regions shapefile containing labeled geometries or
a pre-loaded GeoDataFrame
regions_label : str
Attribute to use as label in the regions shapefile
"""
if not isinstance(regions, gpd.GeoDataFrame):
regions = gpd.read_file(regions).to_crs(cls.CRS)
if regions_label not in regions.columns:
regions_label = cls.DEFAULT_REGIONS_LABEL
regions[regions_label] = regions.index
logger.warning('Setting regions label: {}'
.format(regions_label))
# Centroids used later to deal with points that fall outside regions
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning)
centroids = regions.geometry.centroid
regions['longitude'] = centroids.x
regions['latitude'] = centroids.y
return regions
@classmethod
def _get_meta(cls, meta_path):
""" Load the meta csv file into geopandas dataframe
Parameters
----------
meta_path : str | pandas.DataFrame
Path to meta CSV file, resource .h5 file, or pre-loaded meta
DataFrame containing lat/lon points
"""
if isinstance(meta_path, str):
if meta_path.endswith('.csv'):
meta = pd.read_csv(meta_path)
elif meta_path.endswith('.h5'):
with Resource(meta_path) as f:
meta = f.meta
else:
msg = ("Cannot parse meta data from {}, expecting a .csv or "
".h5 file!".format(meta_path))
logger.error(msg)
raise RuntimeError(msg)
elif isinstance(meta_path, pd.DataFrame):
meta = meta_path
else:
msg = ("Cannot parse meta data from {}, expecting a .csv or "
".h5 file path, or a pre-loaded pandas DataFrame"
.format(meta_path))
logger.error(msg)
raise RuntimeError(msg)
lat_label, long_label = cls._get_lat_lon_labels(meta)
geometry = [Point(xy) for xy in zip(meta[long_label],
meta[lat_label])]
meta = gpd.GeoDataFrame(meta, crs=cls.CRS,
geometry=geometry)
return meta
@staticmethod
def _get_lat_lon_labels(df):
""" Auto detect the latitude and longitude columns from DataFrame
Parameters
----------
df : Pandas DataFrame
Meta data with the latitude/longitude columns to detect
"""
# Latitude
lat_col = [c for c in df if c.lower().startswith('lat')]
if len(lat_col) > 1:
msg = "Multiple latitude columns found: {}".format(lat_col)
logger.error(msg)
raise RuntimeError(msg)
lat_col = lat_col[0]
# Longitude
lon_col = [c for c in df if c.lower().startswith('lon')]
if len(lon_col) > 1:
msg = "Multiple longitude columns found: {}".format(lon_col)
logger.error(msg)
raise RuntimeError(msg)
lon_col = lon_col[0]
return [lat_col, lon_col]
@staticmethod
def _nearest(target, lookup):
""" Lookup the indices to the nearest point
Parameters
----------
target : Pandas DataFrame
List of lat/lon points
lookup : Pandas DataFrame
List of lat/lon points
"""
tree = cKDTree(lookup) # pylint: disable=not-callable
_, inds = tree.query(target, k=1)
return inds
@staticmethod
def _geom_is_valid(geom):
""" Check if individual geometry is valid """
try:
shape(geom)
return 1
except AttributeError:
return 0
[docs] def classify(self, force=False):
""" Classify the meta data with regions labels
Parameters
----------
force : str
Force outlier classification by finding nearest
"""
try:
# Get intersection classifications
meta_inds, region_inds, outlier_inds = self._intersect()
except Exception as e:
logger.warning(e)
invalid_geom_ids = self._check_geometry()
if invalid_geom_ids:
logger.warning('The following geometries are invalid: {}'
.format(invalid_geom_ids))
else:
logger.exception('Cannot run region classification')
raise
classified_meta = self._meta.copy()
classified_meta[self._regions_label] = self._outlier_value
region_labels = self._regions.loc[region_inds, self._regions_label]
region_labels = list(region_labels)
classified_meta.loc[meta_inds, self._regions_label] = region_labels
# Check for any intersection outliers
num_outliers = len(outlier_inds)
if (num_outliers and force):
# Lookup the nearest region geometry (by centroid)
logger.warning('The following points are outliers:')
logger.warning(outlier_inds)
cols = self._get_lat_lon_labels(self._meta)
lookup = self._regions[['latitude', 'longitude']].values
target = self._meta.loc[outlier_inds][cols].values
out_inds = list(self._nearest(target, lookup))
regions = self._regions.loc[out_inds, self._regions_label]
regions = list(regions)
classified_meta.loc[outlier_inds, self._regions_label] = regions
return classified_meta
def _intersect(self):
""" Join the meta points to regions by spatial intersection """
joined = gpd.sjoin(self._meta, self._regions,
how='inner', predicate='intersects')
if 'index_left' in joined.columns:
joined = joined.drop_duplicates('index_left', keep='last')
meta_inds = list(joined['index_left'])
else:
meta_inds = list(joined.index)
region_inds = list(joined['index_right'])
outliers = self._meta.loc[~self._meta.index.isin(meta_inds)]
outlier_inds = list(outliers.index)
return meta_inds, region_inds, outlier_inds
def _check_geometry(self):
""" Get index list of invalid geometries """
geometry = self._regions.geometry
isvalid = geometry.apply(self._geom_is_valid)
return list(self._regions[isvalid == 0].index)
[docs] @classmethod
def run(cls, meta_path, regions, regions_label=None,
force=False, fout=None):
""" Run full classification
Parameters
----------
meta_path : str | pandas.DataFrame
Path to meta CSV file, resource .h5 file, or pre-loaded meta
DataFrame containing lat/lon points
regions : str | GeoDataFrame
Path to regions shapefile containing labeled geometries or
a pre-loaded GeoDataFrame
regions_label : str
Attribute to use a label in the regions shapefile
force : str
Force outlier classification by finding nearest
fout : str
Output CSV file path for labeled meta CSV file
Examples
--------
>>> meta_path = 'meta.csv'
>>> regions_path = 'us_states.shp'
>>> regions_label = 'NAME'
>>> force = True
>>> fout = 'new_meta.csv'
>>>
>>> RegionClassifier.run(meta_path=meta_path,
regions_path=regions_path,
regions_label=regions_label
force=force, fout=fout)
"""
classifier = cls(meta_path=meta_path, regions=regions,
regions_label=regions_label)
classification = classifier.classify(force=force)
if fout:
cls.output_to_csv(classification, fout)
return classification