# -*- coding: utf-8 -*-
"""
reV quality assurance and control classes
"""
import logging
import numpy as np
import os
import pandas as pd
from warnings import warn
from reV.qa_qc.summary import (SummarizeH5, SummarizeSupplyCurve, SummaryPlots,
SupplyCurvePlot, ExclusionsMask)
from reV.supply_curve.exclusions import ExclusionMaskFromDict
from reV.utilities import log_versions, ModuleName
from reV.utilities.exceptions import PipelineError
from gaps.status import Status
logger = logging.getLogger(__name__)
[docs]class QaQc:
"""
reV QA/QC
"""
def __init__(self, out_dir):
"""
Parameters
----------
out_dir : str
Directory path to save summary data and plots too
"""
log_versions(logger)
logger.info('QA/QC results to be saved to: {}'.format(out_dir))
if not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
self._out_dir = out_dir
@property
def out_dir(self):
"""
Output directory
Returns
-------
str
"""
return self._out_dir
@staticmethod
def _scatter_plot(summary_csv, out_root, plot_type='plotly',
cmap='viridis', **kwargs):
"""
Create scatter plot for all summary stats in summary table and save to
out_dir
Parameters
----------
summary_csv : str
Path to .csv file containing summary table
out_root : str
Output directory to save plots to
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
kwargs : dict
Additional plotting kwargs
"""
out_dir = os.path.join(out_root,
os.path.basename(summary_csv).rstrip('.csv'))
if not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
SummaryPlots.scatter_all(summary_csv, out_dir, plot_type=plot_type,
cmap=cmap, **kwargs)
[docs] def create_scatter_plots(self, plot_type='plotly', cmap='viridis',
**kwargs):
"""
Create scatter plot for all compatible summary .csv files
Parameters
----------
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
kwargs : dict
Additional plotting kwargs
"""
for file in os.listdir(self.out_dir):
if file.endswith('.csv'):
summary_csv = os.path.join(self.out_dir, file)
summary = pd.read_csv(summary_csv)
if ('gid' in summary and 'latitude' in summary
and 'longitude' in summary):
self._scatter_plot(summary_csv, self.out_dir,
plot_type=plot_type, cmap=cmap,
**kwargs)
[docs] @classmethod
def h5(cls, h5_file, out_dir, dsets=None, group=None, process_size=None,
max_workers=None, plot_type='plotly', cmap='viridis', **kwargs):
"""
Run QA/QC by computing summary stats from dsets in h5_file and
plotting scatters plots of compatible summary stats
Parameters
----------
h5_file : str
Path to .h5 file to run QA/QC on
out_dir : str
Directory path to save summary tables and plots too
dsets : str | list, optional
Datasets to summarize, by default None
group : str, optional
Group within h5_file to summarize datasets for, by default None
process_size : int, optional
Number of sites to process at a time, by default None
max_workers : int, optional
Number of workers to use when summarizing 2D datasets,
by default None
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
kwargs : dict
Additional plotting kwargs
"""
try:
qa_qc = cls(out_dir)
SummarizeH5.run(h5_file, out_dir, group=group,
dsets=dsets, process_size=process_size,
max_workers=max_workers)
qa_qc.create_scatter_plots(plot_type=plot_type, cmap=cmap,
**kwargs)
except Exception as e:
logger.exception('QAQC failed on file: {}. Received exception:\n{}'
.format(os.path.basename(h5_file), e))
raise e
else:
logger.info('Finished QAQC on file: {} output directory: {}'
.format(os.path.basename(h5_file), out_dir))
[docs] @classmethod
def supply_curve(cls, sc_table, out_dir, columns=None, lcoe='mean_lcoe',
plot_type='plotly', cmap='viridis', sc_plot_kwargs=None,
scatter_plot_kwargs=None):
"""
Plot supply curve
Parameters
----------
sc_table : str
Path to .csv file containing supply curve table
out_dir : str
Directory path to save summary tables and plots too
columns : str | list, optional
Column(s) to summarize, if None summarize all numeric columns,
by default None
lcoe : str, optional
LCOE value to plot, by default 'mean_lcoe'
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
sc_plot_kwargs : dict, optional
Kwargs for supply curve plot, by default None
scatter_plot_kwargs : dict
Kwargs for scatter plot, by default None
"""
if sc_plot_kwargs is None:
sc_plot_kwargs = {}
if scatter_plot_kwargs is None:
scatter_plot_kwargs = {}
try:
qa_qc = cls(out_dir)
SummarizeSupplyCurve.run(sc_table, out_dir, columns=columns)
SupplyCurvePlot.plot(sc_table, out_dir, plot_type=plot_type,
lcoe=lcoe, **sc_plot_kwargs)
qa_qc._scatter_plot(sc_table, out_dir, plot_type=plot_type,
cmap=cmap, **scatter_plot_kwargs)
except Exception as e:
logger.exception('QAQC failed on file: {}. Received exception:\n{}'
.format(os.path.basename(sc_table), e))
raise e
else:
logger.info('Finished QAQC on file: {} output directory: {}'
.format(os.path.basename(sc_table), out_dir))
[docs] @classmethod
def exclusions_mask(cls, excl_h5, out_dir, layers_dict=None, min_area=None,
kernel='queen', hsds=False, plot_type='plotly',
cmap='viridis', plot_step=100, **kwargs):
"""
Create inclusion mask from given layers dictionary, dump to disk and
plot
Parameters
----------
excl_h5 : str
Path to exclusions .h5 file
layers_dict : dict | NoneType
Dictionary of LayerMask arugments {layer: {kwarg: value}}
min_area : float | NoneType
Minimum required contiguous area in sq-km
kernel : str
Contiguous filter method to use on final exclusions
hsds : bool
Boolean flag to use h5pyd to handle .h5 'files' hosted on AWS
behind HSDS
plot_type : str, optional
plot_type of plot to create 'plot' or 'plotly', by default 'plotly'
cmap : str, optional
Colormap name, by default 'viridis'
plot_step : int
Step between points to plot
kwargs : dict
Additional plotting kwargs
"""
try:
cls(out_dir)
excl_mask = ExclusionMaskFromDict.run(excl_h5,
layers_dict=layers_dict,
min_area=min_area,
kernel=kernel,
hsds=hsds)
excl_mask = np.round(excl_mask * 100).astype('uint8')
out_file = os.path.basename(excl_h5).replace('.h5', '_mask.npy')
out_file = os.path.join(out_dir, out_file)
np.save(out_file, excl_mask)
ExclusionsMask.plot(excl_mask, out_dir, plot_type=plot_type,
cmap=cmap, plot_step=plot_step, **kwargs)
except Exception as e:
logger.exception('QAQC failed on file: {}. Received exception:\n{}'
.format(os.path.basename(excl_h5), e))
raise e
else:
logger.info('Finished QAQC on file: {} output directory: {}'
.format(os.path.basename(excl_h5), out_dir))
[docs]class QaQcModule:
"""Class to handle Module QA/QC"""
def __init__(self, module_name, config, out_root):
"""
Parameters
----------
config : dict
Dictionary with pre-extracted config input group.
"""
if not isinstance(config, dict):
raise TypeError('Config input must be a dict but received: {}'
.format(type(config)))
self._name = module_name
self._config = config
self._out_root = out_root
self._default_plot_type = 'plotly'
self._default_cmap = 'viridis'
self._default_plot_step = 100
self._default_lcoe = 'mean_lcoe'
self._default_area_filter_kernel = 'queen'
@property
def fpath(self):
"""Get the reV module output filepath(s)
Returns
-------
fpaths : str | list
One or more filepaths output by current module being QA'd
"""
fpath = self._config['fpath']
if fpath == 'PIPELINE':
target_modules = [self._name]
for target_module in target_modules:
fpath = Status.parse_step_status(self._out_root, target_module)
if fpath:
break
else:
raise PipelineError('Could not parse fpath from previous '
'pipeline jobs.')
fpath = fpath[0]
logger.info('QA/QC using the following '
'pipeline input for fpath: {}'.format(fpath))
return fpath
@property
def sub_dir(self):
"""
QA/QC sub directory for this module's outputs
"""
return self._config.get('sub_dir', None)
@property
def plot_type(self):
"""Get the QA/QC plot type: either 'plot' or 'plotly'"""
return self._config.get('plot_type', self._default_plot_type)
@property
def dsets(self):
"""Get the reV_h5 dsets to QA/QC"""
return self._config.get('dsets', None)
@property
def group(self):
"""Get the reV_h5 group to QA/QC"""
return self._config.get('group', None)
@property
def process_size(self):
"""Get the reV_h5 process_size for QA/QC"""
return self._config.get('process_size', None)
@property
def cmap(self):
"""Get the QA/QC plot colormap"""
return self._config.get('cmap', self._default_cmap)
@property
def plot_step(self):
"""Get the QA/QC step between exclusion mask points to plot"""
return self._config.get('cmap', self._default_plot_step)
@property
def columns(self):
"""Get the supply_curve columns to QA/QC"""
return self._config.get('columns', None)
@property
def lcoe(self):
"""Get the supply_curve lcoe column to plot"""
return self._config.get('lcoe', self._default_lcoe)
@property
def excl_fpath(self):
"""Get the source exclusions filepath"""
excl_fpath = self._config.get('excl_fpath', 'PIPELINE')
if excl_fpath == 'PIPELINE':
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
excl_fpath = Status.parse_step_status(self._out_root,
target_module,
key='excl_fpath')
if not excl_fpath:
excl_fpath = None
msg = ('Could not parse excl_fpath from previous '
'pipeline jobs, defaulting to: {}'.format(excl_fpath))
logger.warning(msg)
warn(msg)
else:
excl_fpath = excl_fpath[0]
logger.info('QA/QC using the following '
'pipeline input for excl_fpath: {}'
.format(excl_fpath))
return excl_fpath
@property
def excl_dict(self):
"""Get the exclusions dictionary"""
excl_dict = self._config.get('excl_dict', 'PIPELINE')
if excl_dict == 'PIPELINE':
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
excl_dict = Status.parse_step_status(self._out_root, target_module,
key='excl_dict')
if not excl_dict:
excl_dict = None
msg = ('Could not parse excl_dict from previous '
'pipeline jobs, defaulting to: {}'.format(excl_dict))
logger.warning(msg)
warn(msg)
else:
excl_dict = excl_dict[0]
logger.info('QA/QC using the following '
'pipeline input for excl_dict: {}'
.format(excl_dict))
return excl_dict
@property
def area_filter_kernel(self):
"""Get the minimum area filter kernel name ('queen' or 'rook')."""
area_filter_kernel = self._config.get('area_filter_kernel', 'PIPELINE')
if area_filter_kernel == 'PIPELINE':
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
key = 'area_filter_kernel'
area_filter_kernel = Status.parse_step_status(self._out_root,
target_module,
key=key)
if not area_filter_kernel:
area_filter_kernel = self._default_area_filter_kernel
msg = ('Could not parse area_filter_kernel from previous '
'pipeline jobs, defaulting to: {}'
.format(area_filter_kernel))
logger.warning(msg)
warn(msg)
else:
area_filter_kernel = area_filter_kernel[0]
logger.info('QA/QC using the following '
'pipeline input for area_filter_kernel: {}'
.format(area_filter_kernel))
return area_filter_kernel
@property
def min_area(self):
"""Get the minimum area filter minimum area in km2."""
min_area = self._config.get('min_area', 'PIPELINE')
if min_area == 'PIPELINE':
target_module = ModuleName.SUPPLY_CURVE_AGGREGATION
min_area = Status.parse_step_status(self._out_root, target_module,
key='min_area')
if not min_area:
min_area = None
msg = ('Could not parse min_area from previous '
'pipeline jobs, defaulting to: {}'
.format(min_area))
logger.warning(msg)
warn(msg)
else:
min_area = min_area[0]
logger.info('QA/QC using the following '
'pipeline input for min_area: {}'
.format(min_area))
return min_area