# -*- coding: utf-8 -*-
"""
reV Base Configuration Framework
"""
import json
import logging
import os
from pathlib import Path
from rex.utilities.utilities import get_class_properties, unstupify_path
from gaps.config import load_config
from reV.utilities.exceptions import ConfigError
logger = logging.getLogger(__name__)
REVDIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
TESTDATADIR = os.path.join(os.path.dirname(REVDIR), 'tests', 'data')
[docs]class BaseConfig(dict):
"""Base class for configuration frameworks."""
REQUIREMENTS = ()
"""Required keys for config"""
STR_REP = {'REVDIR': REVDIR,
'TESTDATADIR': TESTDATADIR}
"""Mapping of config inputs (keys) to desired replacements (values) in
addition to relative file paths as demarcated by ./ and ../"""
def __init__(self, config, check_keys=True, perform_str_rep=True):
"""
Parameters
----------
config : str | dict
File path to config json (str), serialized json object (str),
or dictionary with pre-extracted config.
check_keys : bool, optional
Flag to check config keys against Class properties, by default True
perform_str_rep : bool
Flag to perform string replacement for REVDIR, TESTDATADIR, and ./
"""
# str_rep is a mapping of config strings to replace with real values
self._perform_str_rep = perform_str_rep
self._name = None
self._config_dir = None
self._log_level = None
self._parse_config(config)
self._preflight()
self._keys = self._get_properties()
if check_keys:
self._check_keys()
@property
def config_dir(self):
"""Get the directory that the config file is in.
Returns
-------
config_dir : str
Directory path that the config file is in.
"""
return self._config_dir
@property
def config_keys(self):
"""
List of valid config keys
Returns
-------
list
"""
return self._keys
@property
def log_level(self):
"""Get user-specified "log_level" (DEBUG, INFO, WARNING, etc...).
Returns
-------
log_level : int
Python logging module level (integer format) corresponding to the
config-specified log level string.
"""
if self._log_level is None:
levels = {'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
}
x = str(self.get('log_level', 'INFO'))
self._log_level = levels[x.upper()]
return self._log_level
@property
def name(self):
"""Get the job name, defaults to 'rev'.
Returns
-------
name : str
reV job name.
"""
return self._name or 'rev'
def _preflight(self):
"""Run a preflight check on the config."""
if 'project_control' in self:
msg = ('config "project_control" block is no '
'longer used. All project control keys should be placed at '
'the top config level.')
logger.error(msg)
raise ConfigError(msg)
missing = []
for req in self.REQUIREMENTS:
if req not in self:
missing.append(req)
if any(missing):
e = ('{} missing the following keys: {}'
.format(self.__class__.__name__, missing))
logger.error(e)
raise ConfigError(e)
@classmethod
def _get_properties(cls):
"""
Get all class properties
Used to check against config keys
Returns
-------
properties : list
List of class properties, each of which should represent a valid
config key/entry
"""
return get_class_properties(cls)
def _check_keys(self):
"""
Check on config keys to ensure they match available
properties
"""
for key in self.keys():
if isinstance(key, str) and key not in self._keys:
msg = ('{} is not a valid config entry for {}! Must be one of:'
'\n{}'.format(key, self.__class__.__name__, self._keys))
logger.error(msg)
raise ConfigError(msg)
[docs] def check_overwrite_keys(self, primary_key, *overwrite_keys):
"""
Check for overwrite keys and raise a ConfigError if present
Parameters
----------
primary_key : str
Primary key that overwrites overwrite_keys, used for error message
overwrite_keys : str
Key(s) to overwrite
"""
overwrite = []
for key in overwrite_keys:
if key in self:
overwrite.append(key)
if overwrite:
msg = ('A value for "{}" was provided which overwrites the '
' following key: "{}", please remove them from the config'
.format(primary_key, ', '.join(overwrite)))
logger.error(msg)
raise ConfigError(msg)
def _parse_config(self, config):
"""Parse a config input and set appropriate instance attributes.
Parameters
----------
config : str | dict
File path to config json (str), serialized json object (str),
or dictionary with pre-extracted config.
"""
# str is either json file path or serialized json object
if isinstance(config, str):
try:
# attempt to deserialize JSON-style string
config = json.loads(config)
except json.JSONDecodeError:
self._config_dir = os.path.dirname(unstupify_path(config))
self._config_dir += '/'
self._config_dir = self._config_dir.replace('\\', '/')
config = load_config(config)
# Perform string replacement, save config to self instance
if self._perform_str_rep:
config = self.str_replace_and_resolve(config, self.STR_REP)
self.set_self_dict(config)
[docs] @staticmethod
def check_files(flist):
"""Make sure all files in the input file list exist.
Parameters
----------
flist : list
List of files (with paths) to check existance of.
"""
for f in flist:
# ignore files that are to be specified using pipeline utils
if 'PIPELINE' not in os.path.basename(f):
if os.path.exists(f) is False:
raise IOError('File does not exist: {}'.format(f))
[docs] def str_replace_and_resolve(self, d, str_rep):
"""Perform a deep string replacement and path resolve in d.
Parameters
----------
d : dict
Config dictionary potentially containing strings to replace
and/or paths to resolve.
str_rep : dict
Replacement mapping where keys are strings to search for and
values are the new values.
Returns
-------
d : dict
Config dictionary with updated strings.
"""
if isinstance(d, dict):
# go through dict keys and values
for key, val in d.items():
d[key] = self.str_replace_and_resolve(val, str_rep)
elif isinstance(d, list):
# if the value is also a list, iterate through
for i, entry in enumerate(d):
d[i] = self.str_replace_and_resolve(entry, str_rep)
elif isinstance(d, str):
# if val is a str, check to see if str replacements apply
for old_str, new in str_rep.items():
# old_str is in the value, replace with new value
d = d.replace(old_str, new)
# `resolve_path` is safe to call on any string,
# even if it is not a path
d = self.resolve_path(d)
# return updated
return d
[docs] def set_self_dict(self, dictlike):
"""Save a dict-like variable as object instance dictionary items.
Parameters
----------
dictlike : dict
Python namespace object to set to this dictionary-emulating class.
"""
for key, val in dictlike.items():
self[key] = val
[docs] def resolve_path(self, path):
"""Resolve a file path represented by the input string.
This function resolves the input string if it resembles a path.
Specifically, the string will be resolved if it starts with
"``./``" or "``..``", or it if it contains either "``./``" or
"``..``" somewhere in the string body. Otherwise, the string
is returned unchanged, so this function *is* safe to call on any
string, even ones that do not resemble a path.
This method delegates the "resolving" logic to
:meth:`pathlib.Path.resolve`. This means the path is made
absolute, symlinks are resolved, and "``..``" components are
eliminated. If the ``path`` input starts with "``./``" or
"``..``", it is assumed to be w.r.t the config directory, *not*
the run directory.
Parameters
----------
path : str
Input file path.
Returns
-------
str
The resolved path.
"""
if path.startswith('./'):
path = (self.config_dir / Path(path[2:]))
elif path.startswith('..'):
path = (self.config_dir / Path(path))
elif './' in path: # this covers both './' and '../'
path = Path(path)
try:
path = path.resolve().as_posix()
except AttributeError: # `path` is still a `str`
pass
return path