Source code for reVX.plexos.dpv_plexos

# -*- coding: utf-8 -*-
"""
Distribute PV to PLEXOS methods.
"""
import shutil
import os
import logging
from warnings import warn
import numpy as np
import pandas as pd
from scipy.spatial import cKDTree

from reVX.handlers.outputs import Outputs

logger = logging.getLogger(__name__)


[docs]class DpvResource: """Framework to handle and merge multiple reV distributed PV runs.""" def __init__(self, root_dir, year, sub_dirs=None): """ Parameters ---------- root_dir : str Root directory containing sub directories containing reV dpv job files. year : int | str Year of interest. sub_dirs : list List of strings of directory names in root_dir. Each sub_dir is a job directory. """ self._root_dir = root_dir self._year = year self._sub_dirs = self._parse_sub_dirs(self._root_dir, sub_dirs=sub_dirs) self._fpaths = self._get_all_fpaths() @staticmethod def _parse_sub_dirs(root_dir, sub_dirs=None): """Get a list of strings of directory names in root_dir. Parameters ---------- root_dir : str Root directory containing sub directories containing reV dpv job files. sub_dirs : list | None List of strings of directory names in root_dir. Each sub_dir is a job directory. None to parse dirs. Returns ------- sub_dirs : list List of strings of directory names in root_dir. Each sub_dir is a job directory. """ if sub_dirs is None: sub_dirs = os.listdir(root_dir) sub_dirs = [s for s in sub_dirs if os.path.isdir(os.path.join(root_dir, s))] return sub_dirs def _get_sub_dir_fpath(self, sub_dir): """Get the full h5 filepath of the job in the target sub dir. Parameters ---------- sub_dir : str Directory name in root_dir. Returns ------- fpath : str Full filepath to the h5 job file in root_dir/sub_dir/ """ fpaths = [] d = os.path.join(self._root_dir, sub_dir) for fn in os.listdir(d): if (fn.endswith('.h5') and str(self._year) in fn and 'node' not in fn): fpaths.append(os.path.join(d, fn)) fpath = fpaths[0] if len(fpaths) > 1: wmsg = 'More than one h5 filepath found. Using: {}'.format(fpath) logger.warning(wmsg) warn(wmsg) return fpath def _get_all_fpaths(self): """Get a list of all job filepaths in all sub directories. Returns ------- fpaths : list List of full filepaths to h5 job files in root_dir/sub_dir/ """ fpaths = [] for sub_dir in self._sub_dirs: fpaths.append(self._get_sub_dir_fpath(sub_dir)) return fpaths def _merge_data(self, dset, job_frac_map): """Merge reV data from multiple dpv jobs based on relative fractions. Example: batch job to run multiple DPV jobs at different tilt/azimuths, this method will combine all of the different jobs based on respective fractions in the job_frac_map into a single composite DPV result. Parameters ---------- dset : str Dataset name to merge (i.e. cf_mean, cf_profile, etc...) job_frac_map : dict Lookup of the fractional contribution for each job. The keys are job names (string tags found in the sub_dirs and job h5 files), and the values are fractions for the respective jobs. Returns ------- arr : np.ndarray Unscaled numpy array of dset data merged from all fpaths based on the job fractions. """ arr = None total_frac = 0 for fpath in self._fpaths: apply_frac = None for job_tag, frac in job_frac_map.items(): if job_tag in os.path.basename(fpath): apply_frac = frac total_frac += frac break if apply_frac is not None: logger.debug('Merging "{}" with frac {} from: {}' .format(dset, apply_frac, os.path.basename(fpath))) if arr is None: with Outputs(fpath) as out: arr = (apply_frac * out[dset]) else: with Outputs(fpath) as out: arr += (apply_frac * out[dset]) if np.abs(total_frac - 1) > 0.01: wmsg = ('Fractions sum to {} (should be 1.00) when merging ' 'DPV jobs: {}'.format(total_frac, self._fpaths)) warn(wmsg) logger.warning(wmsg) return arr def _merge_dpv_files(self, fn_out, job_frac_map): """Merge multiple DPV reV runs into a single result based on fractional contributions. Parameters ---------- fn_out : str Filename for merged results, to be saved in root_dir. job_frac_map : dict Lookup of the fractional contribution for each job. The keys are job names (string tags found in the sub_dirs and job h5 files), and the values are fractions for the respective jobs. """ fpath_out = os.path.join(self._root_dir, fn_out) shutil.copy(self._fpaths[0], fpath_out) with Outputs(fpath_out) as out: dsets = [d for d in out.datasets if d not in ['time_index', 'meta']] for dset in dsets: arr = self._merge_data(dset, job_frac_map) logger.debug('Writing "{}" to merged DPV output file.' .format(dset)) with Outputs(fpath_out, mode='a') as out: out[dset] = arr
[docs] @classmethod def merge(cls, root_dir, year, fn_out, job_frac_map): """Merge multiple DPV reV runs into a single result based on fractional contributions. Parameters ---------- root_dir : str Root directory containing sub directories containing reV dpv job files. year : int | str Year of interest. fn_out : str Filename for merged results, to be saved in root_dir. job_frac_map : dict Lookup of the fractional contribution for each job. The keys are job names (string tags found in the sub_dirs and job h5 files), and the values are fractions for the respective jobs. """ dpv = cls(root_dir, year, sub_dirs=list(job_frac_map.keys())) dpv._merge_dpv_files(fn_out, job_frac_map)
[docs]class DpvPlexosAggregation: """Methods to aggregate DPV profiles to plexos nodes.""" def __init__(self, node_buildout, cf_fpath): """ Parameters ---------- node_buildout : str | pd.DataFrame Plexos node buildout meta data. Must have the following columns: [plexos_id, latitude, longitude, built_capacity] cf_fpath : str Filepath to a reV capacity factor file. """ logger.info('Starting DPV to PLEXOS aggregation.') self._parse_node_buildout(node_buildout) self._cf_fpath = cf_fpath self._cf_meta = None self._time_index = None self._kdtree = None logger.debug('Using CF file: {}'.format(self._cf_fpath)) def _parse_node_buildout(self, node_buildout, req=('plexos_id', 'latitude', 'longitude', 'built_capacity')): """Parse the node buildout table. Parameters ---------- node_buildout : str | pd.DataFrame Plexos node buildout meta data. req : list List of required column names in node_buildout """ if isinstance(node_buildout, str): logger.debug('Using PLEXOS node file: {}'.format(node_buildout)) self._node_buildout = pd.read_csv(node_buildout) elif isinstance(node_buildout, pd.DataFrame): self._node_buildout = node_buildout else: raise TypeError('Did not recognize the type of the node buildout ' 'table input: {}'.format(type(node_buildout))) missing = [] for r in req: if r not in self._node_buildout: missing.append(r) if any(missing): raise KeyError('Node buildout table is missing the following ' 'columns: {}'.format(missing)) # drop node buildouts without capacity bad = (pd.isnull(self._node_buildout['built_capacity']) | (self._node_buildout['built_capacity'] == 0)) bad = self._node_buildout.index.values[bad] self._node_buildout = self._node_buildout.drop(bad, axis=0) self._node_buildout = self._node_buildout.reset_index(drop=True) @property def cf_meta(self): """Get the cf meta data. Returns ------- _cf_meta : pd.DataFrame reV CF Meta data. """ if self._cf_meta is None: with Outputs(self._cf_fpath) as out: self._cf_meta = out.meta return self._cf_meta @property def time_index(self): """Get the cf time index. Returns ------- _time_index : pd.DateTimeindex reV timeindex data. """ if self._time_index is None: with Outputs(self._cf_fpath) as out: self._time_index = out.time_index return self._time_index @property def kdtree(self): """Get the KDtree of the reV CF meta data coordinates. Returns ------- _kdtree : scipy.spatial.ckdtree KDtree build from the reV CF meta data coordinates. """ if self._kdtree is None: cols = ['latitude', 'longitude'] # pylint: disable=not-callable self._kdtree = cKDTree(self.cf_meta[cols].values) return self._kdtree def _run_nn(self, query): """Run KDTree query, getting CF index values matching the node buildout Parameters ---------- query : np.ndarray Query the kdtree attribute with this argument. Returns ------- i : np.ndarray 1D array of CF index values. """ d, i = self.kdtree.query(query, k=1) logger.info('KDTree distance min / mean / max: {} / {} / {}' .format(np.round(d.min(), decimals=3), np.round(d.mean(), decimals=3), np.round(d.max(), decimals=3))) return i
[docs] def get_mapped_node_data(self, dset='cf_profile'): """Get the CF dataset mapped to plexos nodes via NN results. Returns ------- node_arr : np.ndarray Get a 2D dataset from the reV cf file mapped to the node_buildout table via NN. """ with Outputs(self._cf_fpath) as out: arr = out[dset] i = self._run_nn(self._node_buildout[['latitude', 'longitude']].values) node_arr = arr[:, i] return node_arr
[docs] def get_node_gen_profiles(self): """Get the generation profiles (cf * capacity) mapped to the node meta. Returns ------- gen_profiles : np.ndarray Timeseries profiles of (t, n) where t is the time index and n is the node buildout. Units are the built_capacity units from the node_buildout table. """ node_arr = self.get_mapped_node_data(dset='cf_profile') built_capacity = self._node_buildout['built_capacity'].values gen_profiles = node_arr * built_capacity return gen_profiles
[docs] @classmethod def run(cls, node_buildout, cf_fpath, out_fpath): """Aggregate dpv reV gen data to a buildout nodes and save to h5. Parameters ---------- node_buildout : str | pd.DataFrame Plexos node buildout meta data. Must have the following columns: [plexos_id, latitude, longitude, built_capacity] cf_fpath : str Filepath to a reV capacity factor file. out_fpath : str Output filepath. """ pa = cls(node_buildout, cf_fpath) profiles = pa.get_node_gen_profiles() logger.info('Writing results to file: {}'.format(out_fpath)) with Outputs(out_fpath, mode='w') as out: out['dpv/meta'] = pa._node_buildout out['dpv/time_index'] = pa.time_index out._create_dset('dpv/gen_profiles', profiles.shape, profiles.dtype, chunks=(None, 100), data=profiles) logger.info('Finished DPV to PLEXOS aggregation.')