"""revrt statistic computation functions"""
import sys
from itertools import chain
from functools import partial
from enum import StrEnum, auto
from collections import defaultdict
import numpy as np
import rasterio.features
from shapely.geometry import shape
from revrt.exceptions import revrtValueError, revrtNotImplementedError
_PCT_PREFIX = "percentile_"
def _not_implemented(stat_name, *__, **___):
"""Default function for stats which may not have a computation"""
msg = f"Default computation unavailable for {stat_name!r}"
raise revrtNotImplementedError(msg)
def _calc_count(processed_raster, **__):
"""Compute number of non-NaN elements"""
try:
return int(processed_raster.count())
except (AttributeError, TypeError):
return int(np.count_nonzero(~np.isnan(processed_raster)))
def _calc_min(processed_raster, **__):
"""Compute minimum value in raster"""
return float(np.nanmin(processed_raster))
def _calc_max(processed_raster, **__):
"""Compute maximum value in raster"""
return float(np.nanmax(processed_raster))
def _calc_mean(processed_raster, out_dtype, **__):
"""Compute mean value of raster"""
return float(np.nanmean(processed_raster, dtype=out_dtype))
def _calc_sum(processed_raster, out_dtype, **__):
"""Compute sum of raster"""
return float(np.nansum(processed_raster, dtype=out_dtype))
def _calc_std(processed_raster, **__):
"""Compute std of raster"""
return float(np.nanstd(processed_raster))
def _calc_median(processed_raster, **__):
"""Compute median of raster"""
return float(np.nanmedian(processed_raster))
def _calc_majority(pixel_count, **__):
"""Compute value that makes up majority of raster"""
return max(pixel_count, key=pixel_count.get)
def _calc_minority(pixel_count, **__):
"""Compute value that makes up minority of raster"""
return min(pixel_count, key=pixel_count.get)
def _calc_unique(pixel_count, **__):
"""Compute number of unique values in raster"""
return len(pixel_count)
def _calc_range(processed_raster, feature_stats, **__):
"""Get the range from stats using min/max, compute if not present"""
try:
r_min = feature_stats["min"]
except KeyError:
r_min = float(np.nanmin(processed_raster))
try:
r_max = feature_stats["max"]
except KeyError:
r_max = float(np.nanmax(processed_raster))
return r_max - r_min
def _calc_nodata(masked_raster, nodata, **__):
"""Compute number of nodata pixels in raster"""
try:
return float(
np.sum(
(masked_raster == nodata)
| (masked_raster == masked_raster.attrs.get("nodata"))
)
)
except AttributeError:
return float(np.sum(masked_raster == nodata))
[docs]
class Stat(StrEnum):
"""Enum of basic computable statistics"""
COUNT = auto()
"""Compute number of non-NaN elements"""
MIN = auto()
"""Compute minimum value in zone"""
MAX = auto()
"""Compute maximum value in zone"""
MEAN = auto()
"""Compute mean value of zone"""
SUM = auto()
"""Compute sum of zone"""
STD = auto()
"""Compute std of zone"""
MEDIAN = auto()
"""Compute median of zone"""
MAJORITY = auto()
"""Compute pixel value that makes up majority of zone"""
MINORITY = auto()
"""Compute pixel value that makes up minority of zone"""
UNIQUE = auto()
"""Compute number of unique pixel values in zone"""
RANGE = auto()
"""Compute range of pixel values in zone"""
NODATA = auto()
"""Compute number of nodata pixels in zone"""
PIXEL_COUNT = auto()
"""Compute count for each pixel value in zone"""
def __new__(cls, value): # noqa: PLR0912, C901
"""Create new enum member"""
obj = str.__new__(cls, value)
obj._value_ = value
match value:
case "count":
obj.compute = _calc_count
obj.requires_pixel_count = True
case "min":
obj.compute = _calc_min
obj.requires_pixel_count = False
case "max":
obj.compute = _calc_max
obj.requires_pixel_count = False
case "mean":
obj.compute = _calc_mean
obj.requires_pixel_count = False
case "sum":
obj.compute = _calc_sum
obj.requires_pixel_count = False
case "std":
obj.compute = _calc_std
obj.requires_pixel_count = False
case "median":
obj.compute = _calc_median
obj.requires_pixel_count = False
case "majority":
obj.compute = _calc_majority
obj.requires_pixel_count = True
case "minority":
obj.compute = _calc_minority
obj.requires_pixel_count = True
case "unique":
obj.compute = _calc_unique
obj.requires_pixel_count = True
case "range":
obj.compute = _calc_range
obj.requires_pixel_count = False
case "nodata":
obj.compute = _calc_nodata
obj.requires_pixel_count = False
case "pixel_count":
obj.compute = partial(_not_implemented, value)
obj.requires_pixel_count = True
case _: # pragma: no cover
obj.compute = partial(_not_implemented, value)
obj.requires_pixel_count = False
return obj
[docs]
class FractionalStat(StrEnum):
"""Enum of fractional pixel statistics"""
FRACTIONAL_PIXEL_COUNT = auto()
"""Compute fractional pixel count for each pixel value in zone"""
FRACTIONAL_AREA = auto()
"""Compute fractional area for each pixel value in zone"""
VALUE_MULTIPLIED_BY_FRACTIONAL_AREA = auto()
"""Compute fractional pixel * area for each pixel value in zone"""
[docs]
class ComputableStats:
"""A class to represent computable statistics for zonal stats"""
def __init__(
self, base_stats=None, percentiles=None, fractional_stats=None
):
"""
Parameters
----------
base_stats : iterable of str, optional
Iterable of "base" statistics to compute (i.e. not
fractional or percentile). By default, ``None``.
percentiles : dict, optional
Dictionary mapping the percentile stat name to the int or
float representing the percentile that can be passed to
:func:`np.percentile`. By default, ``None``.
fractional_stats : iterable of str, optional
One or mre stats from the :class:`FractionalStat` enum to
compute. By default, ``None``.
"""
self.base_stats = base_stats or []
self.percentiles = percentiles or {}
self.fractional_stats = fractional_stats or []
self.all = {
*self.base_stats,
*self.percentiles,
*self.fractional_stats,
}
def __contains__(self, key):
return key in self.all
@property
def empty(self):
"""dict: Dict with empty stat values"""
out = {
k: (0 if k in {Stat.COUNT, Stat.PIXEL_COUNT} else None)
for k in chain(self.base_stats, self.fractional_stats)
}
for pct in self.percentiles:
out[pct] = None
return out
[docs]
def lazy_pixel_count(self, processed_raster):
"""Compute pixel counts from a raster
The output dictionary will be empty if this stats collection
does not contain any stats that require a pixel count.
Parameters
----------
processed_raster : array-like
List or array of data to compute pixel counts from. This
collection can contain NaN values, which will be ignored in
the final output.
Returns
-------
dict
Dictionary where keys are the unique array entries and the
values are the counts for each entry. This dictionary will
be empty if this stats collection does not contain any stats
that require a pixel count.
"""
if not any(stat.requires_pixel_count for stat in self.base_stats):
return {}
keys, counts = np.unique(processed_raster, return_counts=True)
return {
k.item(): c.item()
for k, c in zip(keys, counts, strict=False)
if not np.isnan(k)
}
[docs]
def computed_base_stats(
self, processed_raster, category_map=None, **kwargs
):
"""Compute statistics on array
Parameters
----------
processed_raster : array-like
Array to compute statistics for. This collection can
contain NaN values, which will be ignored in the final
output.
category_map : dict, optional
Dictionary mapping raster values to new names. If given,
this mapping will be applied to the pixel count dictionary,
so you can use it to map raster values to human-readable
category names. By default, ``None``.
**kwargs
Extra keyword-argument pairs to pass to statistic
computation functions. Currently this is only necessary for
the `nodata` stat, which requires the following extra
parameters:
- `masked_raster`: Raster masked to zone, but still
containing the "nodata" value (i.e.
it's not masked out with NaNs).
- `nodata`: Value representing `nodata` that will be
counted for this statistic.
Returns
-------
dict
Dictionary of computed statistics.
"""
category_map = category_map or {}
pixel_count = self.lazy_pixel_count(processed_raster)
pixel_count = {
category_map.get(k, k): v for k, v in pixel_count.items()
}
out_dtype = _out_dtype_from_raster(processed_raster)
feature_stats = {}
for stat in self.base_stats:
if stat == Stat.PIXEL_COUNT:
feature_stats[stat] = pixel_count
continue
feature_stats[stat] = stat.compute(
processed_raster=processed_raster,
pixel_count=pixel_count,
feature_stats=feature_stats,
out_dtype=out_dtype,
**kwargs,
)
return feature_stats
[docs]
def computed_fractional_stats(
self, zone, processed_raster, transform, nodata=None, category_map=None
):
"""Compute fractional statistics on array
Parameters
----------
zone : shapely.geometry
Geometry object representing the zone to compute statistics
over.
processed_raster : array-like
Array to compute statistics for. This collection can
contain NaN values, which will be ignored in the final
output.
transform : affine.Affine
Affine transform object representing the raster
transformation. This is used to compute the raster shapes
for statistics computations as well as the pixel area
for each pixel value in the raster.
nodata : int | float, optional
Value representing "nodata" in the array. These values in
the raster will be ignored and will not contribute to the
statistics and will not be reported. By default, ``None``.
category_map : dict, optional
Optional mapping for raster values. If given, the outputs
will be labeled using the categories in this mapping instead
of the raster values directly. The map does not have to
be exhaustive - missing values will just be reported using
the raw array value. By default, ``None``.
Returns
-------
dict
Dictionary of computed fractional statistics. The keys are
the names of the statistics in the :class:`FractionalStat`
enum and the values are the computed statistics.
"""
if not self.fractional_stats:
return {}
frac_stats = _fractional_stats(
zone, processed_raster, transform, nodata, category_map
)
return {stat: frac_stats[stat] for stat in self.fractional_stats}
[docs]
def computed_percentiles(self, processed_raster):
"""Generate percentile statistics for array
Parameters
----------
processed_raster : array-like
Array to compute percentiles for.
Yields
------
str
Name of percentile stat
int | float
Value representing that percentile in the array.
"""
return {
stat: np.nanpercentile(processed_raster, pct)
for stat, pct in self.percentiles.items()
}
[docs]
@classmethod
def from_iter(cls, stats=None):
"""Create a ComputableStats object from an iterable of stats
Parameters
----------
stats : str | iterable of str, optional
Names of all statistics to compute. Statistics must be one
of the members of :class:`Stat` or :class:`FractionalStat`,
or must start with the ``percentile_`` prefix and end with
an int or float representing the percentile to compute (e.g.
``percentile_10.5``). If only one statistic is to be
computed, you can provide it directly as a string.
Otherwise, provide a list of statistic names or a string
with the names separated by a space. You can also provide
the string ``"ALL"`` or ``"*"`` to specify that all
statistics should be computed. If no input, empty input, or
``None`` is provided, then only the base stats ("count",
"min", "max", "mean") are configured. To summarize, all of
the following are valid inputs:
- ``stats="*"`` or ``stats="ALL"`` or ``stats="All"``
- ``stats="min"``
- ``stats="min max"``
- ``stats=["min"]``
- ``stats=["min", "max", "percentile_10.5"]``
By default, ``None``.
Returns
-------
ComputableStats
An initialized :class:`ComputableStats` object.
Raises
------
revrtValueError
If one or more input stats are not known.
"""
if not stats:
return cls(base_stats=[Stat.COUNT, Stat.MIN, Stat.MAX, Stat.MEAN])
if isinstance(stats, str):
if stats.casefold() in {"*", "all"}:
return cls(
base_stats=list(Stat),
fractional_stats=list(FractionalStat),
)
stats = stats.split()
allowed_base_stats = {*Stat}
allowed_fractional_stats = {*FractionalStat}
base_stats = []
percentiles = {}
fractional_stats = []
for stat in stats:
stat_name = stat.casefold()
if stat_name.startswith(_PCT_PREFIX):
percentiles[stat_name] = _get_percentile(stat_name)
elif stat_name in allowed_fractional_stats:
fractional_stats.append(stat_name)
elif stat_name in allowed_base_stats:
base_stats.append(Stat(stat_name))
else:
valid_stats = {str(s) for s in Stat}
valid_stats |= {str(s) for s in FractionalStat}
msg = (
f"Stat {stat!r} not valid; must be one of:\n"
f"{valid_stats!r}"
)
raise revrtValueError(msg)
return cls(base_stats, percentiles, fractional_stats)
def _get_percentile(stat):
"""Get the percentile value from user string input"""
q = float(stat.replace(_PCT_PREFIX, ""))
if not (0 <= q <= 100): # noqa: PLR2004
msg = f"Percentiles must be between 0 and 100 (inclusive). Got: {q}"
raise revrtValueError(msg)
return q
def _out_dtype_from_raster(raster): # pragma: no cover
"""Get the output dtype for stats like mean and std
If we're on a 64 bit platform and the array is an integer type, this
function ensures that the array is cast to 64 bit to avoid overflow
for certain numpy ops
"""
if sys.maxsize > 2**32 and issubclass(raster.dtype.type, np.integer):
return "int64"
return None # numpy default
def _fractional_stats(
zonal_polygon, window_array, window_transform, nodata, category_map
):
"""Compute fractional statistics on array"""
value_multiplied_by_area = 0.0
category_map = category_map or {}
returned_value_pixel_count_dict = defaultdict(float)
returned_value_pixel_area_dict = defaultdict(float)
pixel_area = _compute_pixel_area(window_transform)
raster_shapes = rasterio.features.shapes(
window_array,
mask=((window_array != nodata) & (~np.isnan(window_array))),
connectivity=4,
transform=window_transform,
)
for geom, value in raster_shapes:
raster_poly = shape(geom)
intersection = raster_poly.intersection(zonal_polygon)
if not intersection.is_empty:
intersection_area = intersection.area
returned_value_pixel_area_dict[value] += intersection_area
value_multiplied_by_area += value * intersection_area
pixel_fractional_count = intersection_area / pixel_area
returned_value_pixel_count_dict[value] += pixel_fractional_count
returned_value_pixel_count_dict = {
category_map.get(k, k): v
for k, v in returned_value_pixel_count_dict.items()
}
returned_value_pixel_area_dict = {
category_map.get(k, k): v
for k, v in returned_value_pixel_area_dict.items()
}
return {
FractionalStat.FRACTIONAL_PIXEL_COUNT: returned_value_pixel_count_dict,
FractionalStat.FRACTIONAL_AREA: returned_value_pixel_area_dict,
FractionalStat.VALUE_MULTIPLIED_BY_FRACTIONAL_AREA: (
value_multiplied_by_area
),
}
def _compute_pixel_area(affine):
"""Compute the pixel area from the affine transform"""
width, _, _, _, height, *_ = affine
return abs(width) * abs(height)