# -*- coding: utf-8 -*-
"""
reV quality assurance and control classes
"""
import logging
import os
from warnings import warn
import numpy as np
import pandas as pd
from gaps.status import Status
from reV.qa_qc.summary import (
ExclusionsMask,
SummarizeH5,
SummarizeSupplyCurve,
SummaryPlots,
SupplyCurvePlot,
)
from reV.supply_curve.exclusions import ExclusionMaskFromDict
from reV.utilities import ModuleName, SupplyCurveField, log_versions
from reV.utilities.exceptions import PipelineError
logger = logging.getLogger(__name__)
[docs]class QaQc:
"""
reV QA/QC
"""
def __init__(self, out_dir):
"""
Parameters
----------
out_dir : str
Directory path to save summary data and plots too
"""
log_versions(logger)
logger.info("QA/QC results to be saved to: {}".format(out_dir))
if not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
self._out_dir = out_dir
@property
def out_dir(self):
"""
Output directory
Returns
-------
str
"""
return self._out_dir
@staticmethod
def _scatter_plot(
summary_csv, out_root, plot_type="plotly", cmap="viridis", **kwargs
):
"""
Create scatter plot for all summary stats in summary table and save to
out_dir
Parameters
----------
summary_csv : str
Path to .csv file containing summary table
out_root : str
Output directory to save plots to
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
kwargs : dict
Additional plotting kwargs
"""
out_dir = os.path.join(
out_root, os.path.basename(summary_csv).rstrip(".csv")
)
if not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
SummaryPlots.scatter_all(
summary_csv, out_dir, plot_type=plot_type, cmap=cmap, **kwargs
)
[docs] def create_scatter_plots(
self, plot_type="plotly", cmap="viridis", **kwargs
):
"""
Create scatter plot for all compatible summary .csv files
Parameters
----------
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
kwargs : dict
Additional plotting kwargs
"""
for file in os.listdir(self.out_dir):
if file.endswith(".csv"):
summary_csv = os.path.join(self.out_dir, file)
summary = pd.read_csv(summary_csv)
has_right_cols = ("gid" in summary
and SupplyCurveField.LATITUDE in summary
and SupplyCurveField.LONGITUDE in summary)
if has_right_cols:
self._scatter_plot(summary_csv, self.out_dir,
plot_type=plot_type, cmap=cmap,
**kwargs)
[docs] @classmethod
def h5(
cls,
h5_file,
out_dir,
dsets=None,
group=None,
process_size=None,
max_workers=None,
plot_type="plotly",
cmap="viridis",
**kwargs,
):
"""
Run QA/QC by computing summary stats from dsets in h5_file and
plotting scatters plots of compatible summary stats
Parameters
----------
h5_file : str
Path to .h5 file to run QA/QC on
out_dir : str
Directory path to save summary tables and plots too
dsets : str | list, optional
Datasets to summarize, by default None
group : str, optional
Group within h5_file to summarize datasets for, by default None
process_size : int, optional
Number of sites to process at a time, by default None
max_workers : int, optional
Number of workers to use when summarizing 2D datasets,
by default None
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
kwargs : dict
Additional plotting kwargs
"""
try:
qa_qc = cls(out_dir)
SummarizeH5.run(
h5_file,
out_dir,
group=group,
dsets=dsets,
process_size=process_size,
max_workers=max_workers,
)
qa_qc.create_scatter_plots(
plot_type=plot_type, cmap=cmap, **kwargs
)
except Exception as e:
logger.exception(
"QAQC failed on file: {}. Received exception:\n{}".format(
os.path.basename(h5_file), e
)
)
raise e
else:
logger.info(
"Finished QAQC on file: {} output directory: {}".format(
os.path.basename(h5_file), out_dir
)
)
[docs] @classmethod
def supply_curve(cls, sc_table, out_dir, columns=None,
lcoe=SupplyCurveField.MEAN_LCOE, plot_type='plotly',
cmap='viridis', sc_plot_kwargs=None,
scatter_plot_kwargs=None):
"""
Plot supply curve
Parameters
----------
sc_table : str
Path to .csv file containing supply curve table
out_dir : str
Directory path to save summary tables and plots too
columns : str | list, optional
Column(s) to summarize, if None summarize all numeric columns,
by default None
lcoe : str, optional
LCOE value to plot, by default :obj:`SupplyCurveField.MEAN_LCOE`
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
sc_plot_kwargs : dict, optional
Kwargs for supply curve plot, by default None
scatter_plot_kwargs : dict
Kwargs for scatter plot, by default None
"""
if sc_plot_kwargs is None:
sc_plot_kwargs = {}
if scatter_plot_kwargs is None:
scatter_plot_kwargs = {}
try:
qa_qc = cls(out_dir)
SummarizeSupplyCurve.run(sc_table, out_dir, columns=columns)
SupplyCurvePlot.plot(
sc_table,
out_dir,
plot_type=plot_type,
lcoe=lcoe,
**sc_plot_kwargs,
)
qa_qc._scatter_plot(
sc_table,
out_dir,
plot_type=plot_type,
cmap=cmap,
**scatter_plot_kwargs,
)
except Exception as e:
logger.exception(
"QAQC failed on file: {}. Received exception:\n{}".format(
os.path.basename(sc_table), e
)
)
raise e
else:
logger.info(
"Finished QAQC on file: {} output directory: {}".format(
os.path.basename(sc_table), out_dir
)
)
[docs] @classmethod
def exclusions_mask(
cls,
excl_h5,
out_dir,
layers_dict=None,
min_area=None,
kernel="queen",
hsds=False,
plot_type="plotly",
cmap="viridis",
plot_step=100,
**kwargs,
):
"""
Create inclusion mask from given layers dictionary, dump to disk and
plot
Parameters
----------
excl_h5 : str
Path to exclusions .h5 file
layers_dict : dict | NoneType
Dictionary of LayerMask arugments {layer: {kwarg: value}}
min_area : float | NoneType
Minimum required contiguous area in sq-km
kernel : str
Contiguous filter method to use on final exclusions
hsds : bool
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
plot_step : int
Step between points to plot
kwargs : dict
Additional plotting kwargs
"""
try:
cls(out_dir)
excl_mask = ExclusionMaskFromDict.run(
excl_h5,
layers_dict=layers_dict,
min_area=min_area,
kernel=kernel,
hsds=hsds,
)
excl_mask = np.round(excl_mask * 100).astype("uint8")
out_file = os.path.basename(excl_h5).replace(".h5", "_mask.npy")
out_file = os.path.join(out_dir, out_file)
np.save(out_file, excl_mask)
ExclusionsMask.plot(
excl_mask,
out_dir,
plot_type=plot_type,
cmap=cmap,
plot_step=plot_step,
**kwargs,
)
except Exception as e:
logger.exception(
"QAQC failed on file: {}. Received exception:\n{}".format(
os.path.basename(excl_h5), e
)
)
raise e
else:
logger.info(
"Finished QAQC on file: {} output directory: {}".format(
os.path.basename(excl_h5), out_dir
)
)
[docs]class QaQcModule:
"""Class to handle Module QA/QC"""
def __init__(self, module_name, config, out_root):
"""
Parameters
----------
config : dict
Dictionary with pre-extracted config input group.
"""
if not isinstance(config, dict):
raise TypeError(
"Config input must be a dict but received: {}".format(
type(config)
)
)
self._name = module_name
self._config = config
self._out_root = out_root
self._default_plot_type = "plotly"
self._default_cmap = "viridis"
self._default_plot_step = 100
self._default_lcoe = SupplyCurveField.MEAN_LCOE
self._default_area_filter_kernel = 'queen'
@property
def fpath(self):
"""Get the reV module output filepath(s)
Returns
-------
fpaths : str | list
One or more filepaths output by current module being QA'd
"""
fpath = self._config["fpath"]
if fpath == "PIPELINE":
target_modules = [self._name]
for target_module in target_modules:
fpath = Status.parse_step_status(self._out_root, target_module)
if fpath:
break
else:
raise PipelineError(
"Could not parse fpath from previous pipeline jobs."
)
fpath = fpath[0]
logger.info(
"QA/QC using the following "
"pipeline input for fpath: {}".format(fpath)
)
return fpath
@property
def sub_dir(self):
"""
QA/QC sub directory for this module's outputs
"""
return self._config.get("sub_dir", None)
@property
def plot_type(self):
"""Get the QA/QC plot type: either 'plot' or 'plotly'"""
return self._config.get("plot_type", self._default_plot_type)
@property
def dsets(self):
"""Get the reV_h5 dsets to QA/QC"""
return self._config.get("dsets", None)
@property
def group(self):
"""Get the reV_h5 group to QA/QC"""
return self._config.get("group", None)
@property
def process_size(self):
"""Get the reV_h5 process_size for QA/QC"""
return self._config.get("process_size", None)
@property
def cmap(self):
"""Get the QA/QC plot colormap"""
return self._config.get("cmap", self._default_cmap)
@property
def plot_step(self):
"""Get the QA/QC step between exclusion mask points to plot"""
return self._config.get("cmap", self._default_plot_step)
@property
def columns(self):
"""Get the supply_curve columns to QA/QC"""
return self._config.get("columns", None)
@property
def lcoe(self):
"""Get the supply_curve lcoe column to plot"""
return self._config.get("lcoe", self._default_lcoe)
@property
def excl_fpath(self):
"""Get the source exclusions filepath"""
excl_fpath = self._config.get("excl_fpath", "PIPELINE")
if excl_fpath == "PIPELINE":
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
excl_fpath = Status.parse_step_status(
self._out_root, target_module, key="excl_fpath"
)
if not excl_fpath:
excl_fpath = None
msg = (
"Could not parse excl_fpath from previous "
"pipeline jobs, defaulting to: {}".format(excl_fpath)
)
logger.warning(msg)
warn(msg)
else:
excl_fpath = excl_fpath[0]
logger.info(
"QA/QC using the following "
"pipeline input for excl_fpath: {}".format(excl_fpath)
)
return excl_fpath
@property
def excl_dict(self):
"""Get the exclusions dictionary"""
excl_dict = self._config.get("excl_dict", "PIPELINE")
if excl_dict == "PIPELINE":
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
excl_dict = Status.parse_step_status(
self._out_root, target_module, key="excl_dict"
)
if not excl_dict:
excl_dict = None
msg = (
"Could not parse excl_dict from previous "
"pipeline jobs, defaulting to: {}".format(excl_dict)
)
logger.warning(msg)
warn(msg)
else:
excl_dict = excl_dict[0]
logger.info(
"QA/QC using the following "
"pipeline input for excl_dict: {}".format(excl_dict)
)
return excl_dict
@property
def area_filter_kernel(self):
"""Get the minimum area filter kernel name ('queen' or 'rook')."""
area_filter_kernel = self._config.get("area_filter_kernel", "PIPELINE")
if area_filter_kernel == "PIPELINE":
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
key = "area_filter_kernel"
area_filter_kernel = Status.parse_step_status(
self._out_root, target_module, key=key
)
if not area_filter_kernel:
area_filter_kernel = self._default_area_filter_kernel
msg = (
"Could not parse area_filter_kernel from previous "
"pipeline jobs, defaulting to: {}".format(
area_filter_kernel
)
)
logger.warning(msg)
warn(msg)
else:
area_filter_kernel = area_filter_kernel[0]
logger.info(
"QA/QC using the following "
"pipeline input for area_filter_kernel: {}".format(
area_filter_kernel
)
)
return area_filter_kernel
@property
def min_area(self):
"""Get the minimum area filter minimum area in km2."""
min_area = self._config.get("min_area", "PIPELINE")
if min_area == "PIPELINE":
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
min_area = Status.parse_step_status(
self._out_root, target_module, key="min_area"
)
if not min_area:
min_area = None
msg = (
"Could not parse min_area from previous "
"pipeline jobs, defaulting to: {}".format(min_area)
)
logger.warning(msg)
warn(msg)
else:
min_area = min_area[0]
logger.info(
"QA/QC using the following "
"pipeline input for min_area: {}".format(min_area)
)
return min_area