Source code for reV.generation.base

# -*- coding: utf-8 -*-
"""
reV base gen and econ module.
"""
import copy
import json
import logging
import os
import sys
from abc import ABC, abstractmethod
from concurrent.futures import TimeoutError
from warnings import warn

import numpy as np
import pandas as pd
import psutil
from rex.resource import Resource
from rex.utilities.execution import SpawnProcessPool

from reV.config.output_request import SAMOutputRequest
from reV.config.project_points import PointsControl, ProjectPoints
from reV.handlers.outputs import Outputs
from reV.SAM.version_checker import PySamVersionChecker
from reV.utilities import ModuleName, ResourceMetaField, log_versions
from reV.utilities.exceptions import (
    ExecutionError,
    OffshoreWindInputWarning,
    OutputWarning,
    ParallelExecutionWarning,
)

logger = logging.getLogger(__name__)


ATTR_DIR = os.path.dirname(os.path.realpath(__file__))
ATTR_DIR = os.path.join(ATTR_DIR, 'output_attributes')
with open(os.path.join(ATTR_DIR, 'other.json')) as f:
    OTHER_ATTRS = json.load(f)
with open(os.path.join(ATTR_DIR, 'lcoe_fcr.json')) as f:
    LCOE_ATTRS = json.load(f)
with open(os.path.join(ATTR_DIR, 'single_owner.json')) as f:
    SO_ATTRS = json.load(f)
with open(os.path.join(ATTR_DIR, 'windbos.json')) as f:
    BOS_ATTRS = json.load(f)
with open(os.path.join(ATTR_DIR, 'lcoe_fcr_inputs.json')) as f:
    LCOE_IN_ATTRS = json.load(f)

LCOE_REQUIRED_OUTPUTS = ("system_capacity", "capital_cost_multiplier",
                         "capital_cost", "fixed_operating_cost",
                         "variable_operating_cost", "base_capital_cost",
                         "base_fixed_operating_cost",
                         "base_variable_operating_cost", "fixed_charge_rate")
"""Required econ outputs in generation file."""


def _add_lcoe_outputs(output_request):
    """Add required lcoe outputs to output request. """
    for out_var in LCOE_REQUIRED_OUTPUTS:
        if out_var not in output_request:
            output_request.append(out_var)
    return output_request


[docs]class BaseGen(ABC): """Base class for reV gen and econ classes to run SAM simulations.""" # Mapping of reV requests to SAM objects that should be used for simulation OPTIONS = {} # Mapping of reV generation / econ outputs to scale factors and units. OUT_ATTRS = copy.deepcopy(OTHER_ATTRS) # Mapping of reV econ outputs to scale factors and units. # Type is scalar or array and corresponds to the SAM single-site output # This is the OUT_ATTRS class attr for Econ but should also be accessible # to rev generation ECON_ATTRS = copy.deepcopy(OTHER_ATTRS) ECON_ATTRS.update(LCOE_ATTRS) ECON_ATTRS.update(SO_ATTRS) ECON_ATTRS.update(BOS_ATTRS) ECON_ATTRS.update(LCOE_IN_ATTRS) # SAM argument names used to calculate LCOE # Note that system_capacity is not included here because it is never used # downstream and could be confused with the supply_curve point capacity LCOE_ARGS = ('fixed_charge_rate', 'capital_cost', 'fixed_operating_cost', 'variable_operating_cost') def __init__( self, points_control, output_request, site_data=None, drop_leap=False, memory_utilization_limit=0.4, scale_outputs=True, ): """ Parameters ---------- points_control : reV.config.project_points.PointsControl Project points control instance for site and SAM config spec. output_request : list | tuple Output variables requested from SAM. site_data : str | pd.DataFrame | None Site-specific input data for SAM calculation. String should be a filepath that points to a csv, DataFrame is pre-extracted data. Rows match sites, columns are input keys. Need a "gid" column. Input as None if no site-specific data. drop_leap : bool Drop leap day instead of final day of year during leap years. memory_utilization_limit : float Memory utilization limit (fractional). This sets how many site results will be stored in-memory at any given time before flushing to disk. scale_outputs : bool Flag to scale outputs in-place immediately upon Gen returning data. """ log_versions(logger) self._points_control = points_control self._year = None self._site_limit = None self._site_mem = None self._out_fpath = None self._meta = None self._time_index = None self._sam_module = None self._sam_obj_default = None self._drop_leap = drop_leap self.mem_util_lim = memory_utilization_limit self.scale_outputs = scale_outputs self._run_attrs = { "points_control": str(points_control), "output_request": output_request, "site_data": str(site_data), "drop_leap": str(drop_leap), "memory_utilization_limit": self.mem_util_lim, } self._site_data = self._parse_site_data(site_data) self.add_site_data_to_pp(self._site_data) output_request = SAMOutputRequest(output_request) self._output_request = self._parse_output_request(output_request) # pre-initialize output arrays to store results when available. self._out = {} self._finished_sites = [] self._out_n_sites = 0 self._out_chunk = () self._check_sam_version_inputs() @property def output_request(self): """Get the output variables requested from the user. Returns ------- output_request : list Output variables requested from SAM. """ return self._output_request @property def out_chunk(self): """Get the current output chunk index range (INCLUSIVE). Returns ------- _out_chunk : tuple Two entry tuple (start, end) indicies (inclusive) for where the current data in-memory belongs in the final output. """ return self._out_chunk @property def site_data(self): """Get the site-specific inputs in dataframe format. Returns ------- _site_data : pd.DataFrame Site-specific input data for gen or econ calculation. Rows match sites, columns are variables. """ return self._site_data @property def site_limit(self): """Get the number of sites results that can be stored in memory at once Returns ------- _site_limit : int Number of site result sets that can be stored in memory at once without violating memory limits. """ if self._site_limit is None: tot_mem = psutil.virtual_memory().total / 1e6 avail_mem = self.mem_util_lim * tot_mem self._site_limit = int(np.floor(avail_mem / self.site_mem)) logger.info( "Limited to storing {0} sites in memory " "({1:.1f} GB total hardware, {2:.1f} GB available " "with {3:.1f}% utilization).".format( self._site_limit, tot_mem / 1e3, avail_mem / 1e3, self.mem_util_lim * 100, ) ) return self._site_limit @property def site_mem(self): """Get the memory (MB) required to store all results for a single site. Returns ------- _site_mem : float Memory (MB) required to store all results in requested in output_request for a single site. """ if self._site_mem is None: # average the memory usage over n sites # (for better understanding of array overhead) n = 100 self._site_mem = 0 for request in self.output_request: dtype = "float32" if request in self.OUT_ATTRS: dtype = self.OUT_ATTRS[request].get("dtype", "float32") shape = self._get_data_shape(request, n) self._site_mem += sys.getsizeof(np.ones(shape, dtype=dtype)) self._site_mem = self._site_mem / 1e6 / n logger.info( "Output results from a single site are calculated to " "use {0:.1f} KB of memory.".format(self._site_mem / 1000) ) return self._site_mem @property def points_control(self): """Get project points controller. Returns ------- points_control : reV.config.project_points.PointsControl Project points control instance for site and SAM config spec. """ return self._points_control @property def project_points(self): """Get project points Returns ------- project_points : reV.config.project_points.ProjectPoints Project points from the points control instance. """ return self._points_control.project_points @property def sam_configs(self): """Get the sam config dictionary. Returns ------- sam_configs : dict SAM config from the project points instance. """ return self.project_points.sam_inputs @property def sam_metas(self): """ SAM configurations including runtime module Returns ------- sam_metas : dict Nested dictionary of SAM configuration files with module used at runtime """ sam_metas = self.sam_configs.copy() for v in sam_metas.values(): v.update({"module": self._sam_module.MODULE}) return sam_metas @property def sam_module(self): """Get the SAM module class to be used for SAM simulations. Returns ------- sam_module : object SAM object like PySAM.Pvwattsv7 or PySAM.Lcoefcr """ return self._sam_module @property def meta(self): """Get resource meta for all sites in project points. Returns ------- meta : pd.DataFrame Meta data df for sites in project points. Column names are meta data variables, rows are different sites. The row index does not indicate the site number if the project points are non-sequential or do not start from 0, so a `SiteDataField.GID` column is added. """ return self._meta @property def time_index(self): """Get the resource time index data. Returns ------- _time_index : pandas.DatetimeIndex Time-series datetime index """ return self._time_index @property def run_attrs(self): """ Run time attributes (__init__ args and kwargs) Returns ------- run_attrs : dict Dictionary of runtime args and kwargs """ return self._run_attrs @property def year(self): """Get the resource year. Returns ------- _year : int Year of the time-series datetime index. """ if self._year is None and self.time_index is not None: self._year = int(self.time_index.year[0]) return self._year @property def tech(self): """Get the reV technology string. Returns ------- tech : str SAM technology to analyze (pvwattsv7, windpower, tcsmoltensalt, solarwaterheat, troughphysicalheat, lineardirectsteam, econ) The string should be lower-cased with spaces and _ removed. """ return self.project_points.tech @property def out(self): """Get the reV gen or econ output results. Returns ------- out : dict Dictionary of gen or econ results from SAM. """ out = {} for k, v in self._out.items(): if k in self.OUT_ATTRS: scale_factor = self.OUT_ATTRS[k].get("scale_factor", 1) else: scale_factor = 1 if scale_factor != 1 and self.scale_outputs: v = v.astype("float32") v /= scale_factor out[k] = v return out @out.setter def out(self, result): """Set the output attribute, unpack futures, clear output from mem. Parameters ---------- result : list | dict | None Gen or Econ results to set to output dictionary. Use cases: - List input is interpreted as a futures list, which is unpacked before setting to the output dict. - Dictionary input is interpreted as an already unpacked result. - None is interpreted as a signal to clear the output dictionary. """ if isinstance(result, list): # unpack futures list to dictionary first result = self.unpack_futures(result) if isinstance(result, dict): # iterate through dict where sites are keys and values are # corresponding results for site_gid, site_output in result.items(): # check that the sites are stored sequentially then add to # the finished site list if self._finished_sites: if int(site_gid) < np.max(self._finished_sites): raise Exception("Site results are non sequential!") # unpack site output object self.unpack_output(site_gid, site_output) # add site gid to the finished list after outputs are unpacked self._finished_sites.append(site_gid) elif isinstance(result, type(None)): self._out.clear() self._finished_sites.clear() else: raise TypeError( "Did not recognize the type of output. " 'Tried to set output type "{}", but requires ' "list, dict or None.".format(type(result)) ) @staticmethod def _output_request_type_check(req): """Output request type check and ensure list for manipulation. Parameters ---------- req : list | tuple | str Output request of variable type. Returns ------- output_request : list Output request. """ if isinstance(req, list): output_request = req elif isinstance(req, tuple): output_request = list(req) elif isinstance(req, str): output_request = [req] else: raise TypeError( "Output request must be str, list, or tuple but " "received: {}".format(type(req)) ) return output_request
[docs] @staticmethod def handle_leap_ti(ti, drop_leap=False): """Handle a time index for a leap year by dropping a day. Parameters ---------- ti : pandas.DatetimeIndex Time-series datetime index with or without leap days. drop_leap : bool Option to drop leap days (if True) or drop the last day of each leap year (if False). Returns ------- ti : pandas.DatetimeIndex Time-series datetime index with length a multiple of 365. """ # Drop leap day or last day leap_day = (ti.month == 2) & (ti.day == 29) leap_year = ti.year % 4 == 0 last_day = ((ti.month == 12) & (ti.day == 31)) * leap_year if drop_leap: # Preference is to drop leap day if exists ti = ti.drop(ti[leap_day]) elif any(leap_day): # Leap day exists but preference is to drop last day of year ti = ti.drop(ti[last_day]) if len(ti) % 365 != 0: raise ValueError( "Bad time index with length not a multiple of " "365: {}".format(ti) ) return ti
@staticmethod def _pp_to_pc( points, points_range, sam_configs, tech, sites_per_worker=None, res_file=None, curtailment=None, ): """ Create ProjectControl from ProjectPoints Parameters ---------- points : int | slice | list | str | pandas.DataFrame | reV.config.project_points.PointsControl Single site integer, or slice or list specifying project points, or string pointing to a project points csv, or a pre-loaded project points DataFrame, or a fully instantiated PointsControl object. points_range : list | None Optional two-entry list specifying the index range of the sites to analyze. To be taken from the reV.config.PointsControl.split_range property. sam_configs : dict | str | SAMConfig SAM input configuration ID(s) and file path(s). Keys are the SAM config ID(s) which map to the config column in the project points CSV. Values are either a JSON SAM config file or dictionary of SAM config inputs. Can also be a single config file path or a pre loaded SAMConfig object. tech : str SAM technology to analyze (pvwattsv7, windpower, tcsmoltensalt, solarwaterheat, troughphysicalheat, lineardirectsteam) The string should be lower-cased with spaces and _ removed. sites_per_worker : int Number of sites to run in series on a worker. None defaults to the resource file chunk size. res_file : str Filepath to single resource file, multi-h5 directory, or /h5_dir/prefix*suffix curtailment : NoneType | dict | str | config.curtailment.Curtailment Inputs for curtailment parameters. If not None, curtailment inputs are expected. Can be: - Explicit namespace of curtailment variables (dict) - Pointer to curtailment config json file with path (str) - Instance of curtailment config object (config.curtailment.Curtailment) Returns ------- pc : reV.config.project_points.PointsControl PointsControl object instance. """ if hasattr(points, "df"): points = points.df pp = ProjectPoints( points, sam_configs, tech=tech, res_file=res_file, curtailment=curtailment, ) # make Points Control instance if points_range is not None: # PointsControl is for just a subset of the project points... # this is the case if generation is being initialized on one # of many HPC nodes in a large project pc = PointsControl.split( points_range[0], points_range[1], pp, sites_per_split=sites_per_worker, ) else: # PointsControl is for all of the project points pc = PointsControl(pp, sites_per_split=sites_per_worker) return pc
[docs] @classmethod def get_pc( cls, points, points_range, sam_configs, tech, sites_per_worker=None, res_file=None, curtailment=None, ): """Get a PointsControl instance. Parameters ---------- points : int | slice | list | str | pandas.DataFrame | PointsControl Single site integer, or slice or list specifying project points, or string pointing to a project points csv, or a pre-loaded project points DataFrame, or a fully instantiated PointsControl object. points_range : list | None Optional two-entry list specifying the index range of the sites to analyze. To be taken from the reV.config.PointsControl.split_range property. sam_configs : dict | str | SAMConfig SAM input configuration ID(s) and file path(s). Keys are the SAM config ID(s) which map to the config column in the project points CSV. Values are either a JSON SAM config file or dictionary of SAM config inputs. Can also be a single config file path or a pre loaded SAMConfig object. tech : str SAM technology to analyze (pvwattsv7, windpower, tcsmoltensalt, solarwaterheat, troughphysicalheat, lineardirectsteam) The string should be lower-cased with spaces and _ removed. sites_per_worker : int Number of sites to run in series on a worker. None defaults to the resource file chunk size. res_file : str Filepath to single resource file, multi-h5 directory, or /h5_dir/prefix*suffix curtailment : NoneType | dict | str | config.curtailment.Curtailment Inputs for curtailment parameters. If not None, curtailment inputs are expected. Can be: - Explicit namespace of curtailment variables (dict) - Pointer to curtailment config json file with path (str) - Instance of curtailment config object (config.curtailment.Curtailment) Returns ------- pc : reV.config.project_points.PointsControl PointsControl object instance. """ if tech not in cls.OPTIONS and tech.lower() != ModuleName.ECON: msg = ( 'Did not recognize reV-SAM technology string "{}". ' "Technology string options are: {}".format( tech, list(cls.OPTIONS.keys()) ) ) logger.error(msg) raise KeyError(msg) if sites_per_worker is None: # get the optimal sites per split based on res file chunk size sites_per_worker = cls.get_sites_per_worker(res_file) logger.debug( "Sites per worker being set to {} for " "PointsControl.".format( sites_per_worker ) ) if isinstance(points, PointsControl): # received a pre-intialized instance of pointscontrol pc = points else: pc = cls._pp_to_pc( points, points_range, sam_configs, tech, sites_per_worker=sites_per_worker, res_file=res_file, curtailment=curtailment, ) return pc
[docs] @staticmethod def get_sites_per_worker(res_file, default=100): """Get the nominal sites per worker (x-chunk size) for a given file. This is based on the concept that it is most efficient for one core to perform one read on one chunk of resource data, such that chunks will not have to be read into memory twice and no sites will be read redundantly. Parameters ---------- res_file : str Filepath to single resource file, multi-h5 directory, or /h5_dir/prefix*suffix default : int Sites to be analyzed on a single core if the chunk size cannot be determined from res_file. Returns ------- sites_per_worker : int Nominal sites to be analyzed per worker. This is set to the x-axis chunk size for windspeed and dni datasets for the WTK and NSRDB data, respectively. """ if not res_file or not os.path.isfile(res_file): return default with Resource(res_file) as res: if "wtk" in res_file.lower(): for dset in res.datasets: if "speed" in dset: # take nominal WTK chunks from windspeed _, _, chunks = res.get_dset_properties(dset) break elif "nsrdb" in res_file.lower(): # take nominal NSRDB chunks from dni _, _, chunks = res.get_dset_properties("dni") else: warn( "Could not infer dataset chunk size as the resource type " "could not be determined from the filename: {}".format( res_file ) ) chunks = None if chunks is None: # if chunks not set, go to default sites_per_worker = default logger.debug( "Sites per worker being set to {} (default) based on " "no set chunk size in {}.".format(sites_per_worker, res_file) ) else: sites_per_worker = chunks[1] logger.debug( "Sites per worker being set to {} based on chunk " "size of {}.".format(sites_per_worker, res_file) ) return sites_per_worker
[docs] @staticmethod def unpack_futures(futures): """Combine list of futures results into their native dict format/type. Parameters ---------- futures : list List of dictionary futures results. Returns ------- out : dict Compiled results of the native future results type (dict). """ out = {} for x in futures: out.update(x) return out
@staticmethod @abstractmethod def _run_single_worker( points_control, tech=None, res_file=None, output_request=None, scale_outputs=True, ): """Run a reV-SAM analysis based on the points_control iterator. Parameters ---------- points_control : reV.config.PointsControl A PointsControl instance dictating what sites and configs are run. tech : str SAM technology to analyze (pvwattsv7, windpower, tcsmoltensalt, solarwaterheat, troughphysicalheat, lineardirectsteam) The string should be lower-cased with spaces and _ removed. res_file : str Filepath to single resource file, multi-h5 directory, or /h5_dir/prefix*suffix output_request : list | tuple Output variables requested from SAM. scale_outputs : bool Flag to scale outputs in-place immediately upon returning data. Returns ------- out : dict Output dictionary from the SAM reV_run function. Data is scaled within this function to the datatype specified in cls.OUT_ATTRS. """ def _parse_site_data(self, inp): """Parse site-specific data from input arg Parameters ---------- inp : str | pd.DataFrame | None Site data in .csv or pre-extracted dataframe format. None signifies that there is no extra site-specific data and that everything is fully defined in the input h5 and SAM json configs. Returns ------- site_data : pd.DataFrame Site-specific data for econ calculation. Rows correspond to sites, columns are variables. """ if inp is None or inp is False: # no input, just initialize dataframe with site gids as index site_data = pd.DataFrame(index=self.project_points.sites) site_data.index.name = ResourceMetaField.GID else: # explicit input, initialize df if isinstance(inp, str): if inp.endswith(".csv"): site_data = pd.read_csv(inp) elif isinstance(inp, pd.DataFrame): site_data = inp else: # site data was not able to be set. Raise error. raise Exception( "Site data input must be .csv or " "dataframe, but received: {}".format(inp) ) gid_not_in_site_data = ResourceMetaField.GID not in site_data index_name_not_gid = site_data.index.name != ResourceMetaField.GID if gid_not_in_site_data and index_name_not_gid: # require gid as column label or index raise KeyError('Site data input must have ' f'{ResourceMetaField.GID} column to match ' 'reV site gid.') # pylint: disable=no-member if site_data.index.name != ResourceMetaField.GID: # make gid the dataframe index if not already site_data = site_data.set_index(ResourceMetaField.GID, drop=True) if "offshore" in site_data: if site_data["offshore"].sum() > 1: w = ('Found offshore sites in econ site data input. ' 'This functionality has been deprecated. ' 'Please run the reV offshore module to ' 'calculate offshore wind lcoe.') warn(w, OffshoreWindInputWarning) logger.warning(w) return site_data
[docs] def add_site_data_to_pp(self, site_data): """Add the site df (site-specific inputs) to project points dataframe. This ensures that only the relevant site's data will be passed through to parallel workers when points_control is iterated and split. Parameters ---------- site_data : pd.DataFrame Site-specific data for econ calculation. Rows correspond to sites, columns are variables. """ self.project_points.join_df(site_data, key=self.site_data.index.name)
def _parse_output_request(self, req): """Set the output variables requested from the user. Parameters ---------- req : list | tuple Output variables requested from SAM. Returns ------- output_request : list Output variables requested from SAM. """ output_request = self._output_request_type_check(req) if "lcoe_fcr" in output_request: output_request = _add_lcoe_outputs(output_request) return output_request def _get_data_shape(self, dset, n_sites): """Get the output array shape based on OUT_ATTRS or PySAM.Outputs. Parameters ---------- dset : str Variable name to get shape for. n_sites : int Number of sites for this data shape. Returns ------- shape : tuple 1D or 2D shape tuple for dset. """ if dset in self.OUT_ATTRS: return self._get_data_shape_from_out_attrs(dset, n_sites) if dset in self.project_points.all_sam_input_keys: return self._get_data_shape_from_sam_config(dset, n_sites) return self._get_data_shape_from_pysam(dset, n_sites) def _get_data_shape_from_out_attrs(self, dset, n_sites): """Get data shape from ``OUT_ATTRS`` variable""" if self.OUT_ATTRS[dset]["type"] == "array": return (len(self.time_index), n_sites) return (n_sites,) def _get_data_shape_from_sam_config(self, dset, n_sites): """Get data shape from SAM input config""" data = list(self.project_points.sam_inputs.values())[0][dset] if isinstance(data, (list, tuple, np.ndarray)): return (*np.array(data).shape, n_sites) if isinstance(data, str): msg = ( 'Cannot pass through non-scalar SAM input key "{}" ' "as an output_request!".format(dset) ) logger.error(msg) raise ExecutionError(msg) return (n_sites,) def _get_data_shape_from_pysam(self, dset, n_sites): """Get data shape from PySAM output object""" if self._sam_obj_default is None: self._sam_obj_default = self.sam_module.default() try: out_data = getattr(self._sam_obj_default.Outputs, dset) except AttributeError as e: msg = ( 'Could not get data shape for dset "{}" ' 'from object "{}". ' 'Received the following error: "{}"'.format( dset, self._sam_obj_default, e ) ) logger.error(msg) raise ExecutionError(msg) from e if isinstance(out_data, (int, float, str)): return (n_sites,) if len(out_data) % len(self.time_index) == 0: return (len(self.time_index), n_sites) return (len(out_data), n_sites) def _init_fpath(self, out_fpath, module): """Combine directory and filename, ensure .h5 ext., make out dirs.""" if out_fpath is None: return project_dir, out_fn = os.path.split(out_fpath) # ensure output file is an h5 if not out_fn.endswith(".h5"): out_fn += ".h5" if module not in out_fn: extension_with_module = "_{}.h5".format(module) out_fn = out_fn.replace(".h5", extension_with_module) # ensure year is in out_fpath if self.year is not None: extension_with_year = "_{}.h5".format(self.year) if extension_with_year not in out_fn: out_fn = out_fn.replace(".h5", extension_with_year) # create and use optional output dir if project_dir and not os.path.exists(project_dir): os.makedirs(project_dir, exist_ok=True) self._out_fpath = os.path.join(project_dir, out_fn) self._run_attrs["out_fpath"] = out_fpath def _init_h5(self, mode="w"): """Initialize the single h5 output file with all output requests. Parameters ---------- mode : str Mode to instantiate h5py.File instance """ if self._out_fpath is None: return if "w" in mode: logger.info( 'Initializing full output file: "{}" with mode: {}'.format( self._out_fpath, mode ) ) elif "a" in mode: logger.info( 'Appending data to output file: "{}" with mode: {}'.format( self._out_fpath, mode ) ) attrs = {d: {} for d in self.output_request} chunks = {} dtypes = {} shapes = {} # flag to write time index if profiles are being output write_ti = False for dset in self.output_request: tmp = "other" if dset in self.OUT_ATTRS: tmp = dset attrs[dset]["units"] = self.OUT_ATTRS[tmp].get("units", "unknown") attrs[dset]["scale_factor"] = self.OUT_ATTRS[tmp].get( "scale_factor", 1 ) chunks[dset] = self.OUT_ATTRS[tmp].get("chunks", None) dtypes[dset] = self.OUT_ATTRS[tmp].get("dtype", "float32") shapes[dset] = self._get_data_shape(dset, len(self.meta)) if len(shapes[dset]) > 1: write_ti = True # only write time index if profiles were found in output request if write_ti: ti = self.time_index else: ti = None Outputs.init_h5( self._out_fpath, self.output_request, shapes, attrs, chunks, dtypes, self.meta, time_index=ti, configs=self.sam_metas, run_attrs=self.run_attrs, mode=mode, ) def _init_out_arrays(self, index_0=0): """Initialize output arrays based on the number of sites that can be stored in memory safely. Parameters ---------- index_0 : int This is the site list index (not gid) for the first site in the output data. If a node cannot process all sites in-memory at once, this is used to segment the sites in the current output chunk. """ self._out = {} self._finished_sites = [] # Output chunk is the index range (inclusive) of this set of site outs self._out_chunk = ( index_0, np.min((index_0 + self.site_limit, len(self.project_points) - 1)), ) self._out_n_sites = int(self.out_chunk[1] - self.out_chunk[0]) + 1 logger.info( "Initializing in-memory outputs for {} sites with gids " "{} through {} inclusive (site list index {} through {})".format( self._out_n_sites, self.project_points.sites[self.out_chunk[0]], self.project_points.sites[self.out_chunk[1]], self.out_chunk[0], self.out_chunk[1], ) ) for request in self.output_request: dtype = "float32" if request in self.OUT_ATTRS and self.scale_outputs: dtype = self.OUT_ATTRS[request].get("dtype", "float32") shape = self._get_data_shape(request, self._out_n_sites) # initialize the output request as an array of zeros self._out[request] = np.zeros(shape, dtype=dtype) def _check_sam_version_inputs(self): """Check the PySAM version and input keys. Fix where necessary.""" for key, parameters in self.project_points.sam_inputs.items(): updated = PySamVersionChecker.run(self.tech, parameters) sam_obj = self._points_control._project_points._sam_config_obj sam_obj._inputs[key] = updated
[docs] def unpack_output(self, site_gid, site_output): """Unpack a SAM SiteOutput object to the output attribute. Parameters ---------- site_gid : int Resource-native site gid (index). site_output : dict SAM site output object. """ # iterate through the site results for var, value in site_output.items(): if var not in self._out: raise KeyError( 'Tried to collect output variable "{}", but it ' "was not yet initialized in the output " "dictionary." ) # get the index in the output array for the current site i = self.site_index(site_gid, out_index=True) # check to see if we have exceeded the current output chunk. # If so, flush data to disk and reset the output initialization if i + 1 > self._out_n_sites: self.flush() global_site_index = self.site_index(site_gid) self._init_out_arrays(index_0=global_site_index) i = self.site_index(site_gid, out_index=True) if isinstance(value, (list, tuple, np.ndarray)): if not isinstance(value, np.ndarray): value = np.array(value) self._out[var][:, i] = value.T elif value != 0: self._out[var][i] = value
[docs] def site_index(self, site_gid, out_index=False): """Get the index corresponding to the site gid. Parameters ---------- site_gid : int Resource-native site index (gid). out_index : bool Option to get output index (if true) which is the column index in the current in-memory output array, or (if false) the global site index from the project points site list. Returns ------- index : int Global site index if out_index=False, otherwise column index in the current in-memory output array. """ # get the index for site_gid in the (global) project points site list. global_site_index = self.project_points.sites.index(site_gid) if not out_index: output_index = global_site_index else: output_index = global_site_index - self.out_chunk[0] if output_index < 0: raise ValueError( "Attempting to set output data for site with " "gid {} to global site index {}, which was " "already set based on the current output " "index chunk of {}".format( site_gid, global_site_index, self.out_chunk ) ) return output_index
[docs] def flush(self): """Flush the output data in self.out attribute to disk in .h5 format. The data to be flushed is accessed from the instance attribute "self.out". The disk target is based on the instance attributes "self._out_fpath". Data is not flushed if _fpath is None or if .out is empty. """ # handle output file request if file is specified and .out is not empty if isinstance(self._out_fpath, str) and self._out: logger.info( 'Flushing outputs to disk, target file: "{}"'.format( self._out_fpath ) ) # get the slice of indices to write outputs to islice = slice(self.out_chunk[0], self.out_chunk[1] + 1) # open output file in append mode to add output results to with Outputs(self._out_fpath, mode="a") as f: # iterate through all output requests writing each as a dataset for dset, arr in self._out.items(): if len(arr.shape) == 1: # write array of scalars f[dset, islice] = arr else: # write 2D array of profiles f[dset, :, islice] = arr logger.debug("Flushed output successfully to disk.")
def _pre_split_pc(self, pool_size=None): """Pre-split project control iterator into sub chunks to further split the parallelization. Parameters ---------- pool_size : int Number of futures to submit to a single process pool for parallel futures. If ``None``, the pool size is set to ``os.cpu_count() * 2``. By default, ``None``. Returns ------- N : int Total number of points control split instances. pc_chunks : list List of lists of points control split instances. """ N = 0 pc_chunks = [] i_chunk = [] if pool_size is None: pool_size = os.cpu_count() * 2 for i, split in enumerate(self.points_control): N += 1 i_chunk.append(split) if (i + 1) % pool_size == 0: pc_chunks.append(i_chunk) i_chunk = [] if i_chunk: pc_chunks.append(i_chunk) logger.debug( "Pre-splitting points control into {} chunks with the " "following chunk sizes: {}".format( len(pc_chunks), [len(x) for x in pc_chunks] ) ) return N, pc_chunks # pylint: disable=unused-argument def _reduce_kwargs(self, pc, **kwargs): """Placeholder for functions that need to reduce the global kwargs that they send to workers to reduce memory footprint Parameters ---------- pc : PointsControl PointsControl object for a single worker chunk kwargs : dict Kwargs for all gids that needs to be reduced before being sent to ``_run_single_worker()`` Returns ------- kwargs : dict Same as input but reduced just for the gids in pc """ return kwargs def _parallel_run( self, max_workers=None, pool_size=None, timeout=1800, **kwargs ): """Execute parallel compute. Parameters ---------- max_workers : None | int Number of workers. None will default to cpu count. pool_size : int Number of futures to submit to a single process pool for parallel futures. If ``None``, the pool size is set to ``os.cpu_count() * 2``. By default, ``None``. timeout : int | float Number of seconds to wait for parallel run iteration to complete before returning zeros. kwargs : dict Keyword arguments to self._run_single_worker(). """ if pool_size is None: pool_size = os.cpu_count() * 2 if max_workers is None: max_workers = os.cpu_count() logger.info( "Running parallel execution with max_workers={}".format( max_workers ) ) i = 0 N, pc_chunks = self._pre_split_pc(pool_size=pool_size) for j, pc_chunk in enumerate(pc_chunks): logger.debug( "Starting process pool for points control " "iteration {} out of {}".format(j + 1, len(pc_chunks)) ) failed_futures = False chunks = {} futures = [] loggers = [__name__, "reV.gen", "reV.econ", "reV"] with SpawnProcessPool( max_workers=max_workers, loggers=loggers ) as exe: for pc in pc_chunk: pc_kwargs = self._reduce_kwargs(pc, **kwargs) future = exe.submit( self._run_single_worker, pc, **pc_kwargs ) futures.append(future) chunks[future] = pc for future in futures: i += 1 try: result = future.result(timeout=timeout) except TimeoutError: failed_futures = True sites = chunks[future].project_points.sites result = self._handle_failed_future( future, i, sites, timeout ) self.out = result mem = psutil.virtual_memory() m = ( "Parallel run at iteration {0} out of {1}. " "Memory utilization is {2:.3f} GB out of {3:.3f} GB " "total ({4:.1f}% used, intended limit of {5:.1f}%)" .format( i, N, mem.used / 1e9, mem.total / 1e9, 100 * mem.used / mem.total, 100 * self.mem_util_lim, ) ) logger.info(m) if failed_futures: logger.info("Forcing pool shutdown after failed futures.") exe.shutdown(wait=False) logger.info("Forced pool shutdown complete.") self.flush() def _handle_failed_future(self, future, i, sites, timeout): """Handle a failed future and return zeros. Parameters ---------- future : concurrent.futures.Future Failed future to cancel. i : int Iteration number for logging sites : list List of site gids belonging to this failed future. timeout : int Number of seconds to wait for parallel run iteration to complete before returning zeros. """ w = ("Iteration {} hit the timeout limit of {} seconds! " "Passing zeros.".format(i, timeout)) logger.warning(w) warn(w, OutputWarning) site_out = dict.fromkeys(self.output_request, 0) result = dict.fromkeys(sites, site_out) try: cancelled = future.cancel() except Exception as e: w = "Could not cancel future! Received exception: {}".format(e) logger.warning(w) warn(w, ParallelExecutionWarning) if not cancelled: w = "Could not cancel future!" logger.warning(w) warn(w, ParallelExecutionWarning) return result