Module buildstock_query.tools.logic_parser

Expand source code
from functools import reduce
import itertools as it
import yaml
from collections import defaultdict
import json
from buildstock_query.helpers import read_csv


class LogicParser:
    def __init__(self, opt_sat_path, yaml_file) -> None:
        opt_df = read_csv(opt_sat_path)
        opt_df = opt_df[opt_df["Saturation"] > 0]
        self.available_opts = opt_df.groupby("Parameter")['Option'].agg(set).to_dict()
        self.yaml_file = yaml_file

    def get_cfg(self) -> dict:
        """Get the buildstock configuration file as a dictionary object.

        Returns:
            dict: The buildstock configuration file.
        """
        with open(self.yaml_file) as f:
            config = yaml.load(f, Loader=yaml.SafeLoader)
        return config

    def get_apply_logics(self, upgrade_num, option_name):
        """Get the apply logic for a given upgrade number and option.

        Args:
            upgrade_num (int): The upgrade number.
            option_name (str): The option name.

        Returns:
            dict: The apply logic for the given upgrade number and option name.
        """
        config = self.get_cfg()
        upgrade = config["upgrades"][upgrade_num - 1]
        opt2logic = dict()
        for opt in upgrade["options"]:
            para, _ = self._get_para_option(opt["option"])
            if para == option_name:
                if "apply_logic" in opt and "package_apply_logic" in upgrade:
                    logic = {"and": [opt["apply_logic"], upgrade["package_apply_logic"]]}
                elif "apply_logic" in opt:
                    logic = opt["apply_logic"]
                else:
                    logic = upgrade["package_apply_logic"]
                if opt["option"] in opt2logic:
                    opt2logic[opt["option"]] = {"or": [opt2logic[opt["option"]], logic]}
                else:
                    opt2logic[opt["option"]] = logic
        return opt2logic

    def get_upgrade_options_map(self):
        """Get list of all options for all upgrades"""
        config = self.get_cfg()
        upgrade_options_map = defaultdict(set)
        for upgrade_num, upgrade in enumerate(config["upgrades"], start=1):
            for opt in upgrade["options"]:
                para, _ = self._get_para_option(opt["option"])
                upgrade_options_map[(upgrade_num, upgrade['upgrade_name'])].add(para)
        return upgrade_options_map

    def get_overlap_report(self):
        """
        For all the option in each upgrade, verify that there are no overlapping selection.
        """
        upgrade_options_map = self.get_upgrade_options_map()
        overlap_report = ""
        for (upgrade_num, upgrade_name), options in upgrade_options_map.items():
            for option in options:
                print(f"Verifying upgrade {upgrade_num} ({upgrade_name}), {option}")
                opt2logic = self.get_apply_logics(upgrade_num, option)
                for opt1, opt2 in it.combinations(opt2logic.keys(), 2):
                    overlap = self.get_overlapping_selections(opt2logic[opt1], opt2logic[opt2])
                    if overlap:
                        overlap_report += f"Upgrade {upgrade_num} ({upgrade_name}),\
                                            has overlapping selections for {opt1} and {opt2}:\n"
                        overlap_report += json.dumps(overlap, indent=2) + "\n"
        return overlap_report

    @staticmethod
    def _get_para_option(condition):
        try:
            para, option = condition.split("|")
        except ValueError as e:
            raise ValueError(f"Condition {condition} is invalid") from e
        return para, option

    @staticmethod
    def and_dicts(dict1: dict[str, set[str]], dict2: dict[str, set[str]]):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        # l1 = {'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o5'}, 'p5': {'o9', 'o10'}}
        # l2 = {'p1': {'o3', 'o4'}, 'p2': {'o5', 'o6'}, 'p9': {'o11', 'o12'}}
        # l1 & l2 = {'p1': {'o3'}, 'p2': {'o5'}, 'p5': {'o9', 'o10'}, 'p9': {'o11', 'o12'}}
        new_dict: dict[str, set[str]] = {}
        for key in sorted(set(dict1.keys()) | set(dict2.keys())):
            if key in dict1 and key in dict2:
                new_dict[key] = dict1[key].intersection(dict2[key])
            elif key in dict1:
                new_dict[key] = dict1[key]
            else:
                new_dict[key] = dict2[key]
        return new_dict

    @staticmethod
    def and_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]]):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        new_logic: list[dict[str, set[str]]] = []
        for dict1, dict2 in it.product(selections1, selections2):
            new_logic.append(LogicParser.and_dicts(dict1, dict2))
        return new_logic

    @staticmethod
    def _trim_selections(selections: list[dict[str, set[str]]]):
        """
        Remove any selections that are subsets of another selection or contains
        a key with no available values
        """
        new_selections = []
        keys2seen_selections: dict[tuple, list[dict[str, set[str]]]] = dict()

        def is_subset(sel):
            sel_keys = tuple(sorted(sel.keys()))
            keys_combi = (keys for count in range(1, len(sel_keys) + 1) for keys in it.combinations(sel_keys, count))
            for keys in keys_combi:
                if keys in keys2seen_selections:
                    for seen_selection in keys2seen_selections[keys]:
                        if all(sel[key] <= seen_selection[key] for key in keys):
                            return True
            return False

        for selection in sorted(selections, key=lambda x: len(x)):
            if any(len(selection[key]) == 0 for key in selection):
                continue
            if not is_subset(selection):
                keys2seen_selections[tuple(sorted(selection.keys()))] = [selection]
                new_selections.append(selection)
        return new_selections

    @staticmethod
    def _compress_selections(selections: list[dict[str, set[str]]]):
        """
        If there are multiple selections with the same keys, and same values except for one key,
        merge them into one selection to reduce the number of selections.
        For example: [{'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o5'}},
                      {'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o6'}}] will be merged into
                     [{'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o5', 'o6'}}]
        """
        keys2seen_selections: dict[tuple, list[dict[str, set[str]]]] = dict()
        for sel_dict in selections:
            keys = tuple(sorted(sel_dict.keys()))
            if keys in keys2seen_selections:
                for seen_selection in keys2seen_selections[keys]:
                    matching_keys = {key for key in keys if seen_selection[key] == sel_dict[key]}
                    if len(matching_keys) == len(keys):
                        # If all keys match, we have a duplicate sel_dict
                        break
                    elif len(matching_keys) == len(keys) - 1:
                        # sel_dict has one key with different value from before. Simply update the record
                        key = tuple(set(keys) - matching_keys)[0]
                        seen_selection[key] |= sel_dict[key]
                        break
                else:
                    # If we did not break out of the loop, we have a new selection that can't be merged with
                    # any of the previous selections
                    keys2seen_selections[keys].append(sel_dict)
            else:
                keys2seen_selections[keys] = [sel_dict]
        return list(selection for seen_selections in keys2seen_selections.values() for selection in seen_selections)

    @staticmethod
    def clean_selections(selections: list[dict[str, set[str]]]):
        """
        Repeatedly compress and trim selections until there is no change.
        """
        while True:
            new_selections = LogicParser._compress_selections(LogicParser._trim_selections(selections))
            if len(new_selections) == len(selections):
                return new_selections
            selections = new_selections

    @staticmethod
    def or_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]]):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        return selections1 + selections2

    def inverse_selection(self, selection: dict):
        return list({k: self.available_opts[k] - v - {"Void"}} for k, v in selection.items())

    def not_(self, logic1):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        prarsed_logic = self.prase_logic(logic1)
        return_val = list(reduce(self.and_, [self.inverse_selection(selection) for selection in prarsed_logic]))
        return return_val

    def _normalize_lists(self, logic, parent=None):
        """Any list that is not in a or block is considered to be in an and block.
            This block will normalize this pattern by adding "and" wherever required.
        Args:
            logic (_type_): Logic structure (dict, list etc)
            parent (_type_, optional): The parent of the current logic block. If it is a list, and there is no parent,
            the list will be wrapped in a and block.

        Returns:
            _type_: _description_
        """
        if isinstance(logic, list):
            # If it is a single element list, just unwrap and return
            if len(logic) == 1:
                return self._normalize_lists(logic[0])
            new_logic = [self._normalize_lists(el) for el in logic]
            return {"and": new_logic} if parent is None else new_logic
        elif isinstance(logic, dict):
            new_dict = {key: self._normalize_lists(value, parent=key) for key, value in logic.items()}
            return new_dict
        else:
            return logic

    def prase_logic(self, logic):
        """
        Convert the parameter|option logic in the yaml file into the format the apply upgrade measure understands.

        :param logic: dict, list, or string with downselection logic in it
        :returns: str of logic
        """
        logic = self._normalize_lists(logic)
        if isinstance(logic, dict):
            assert (len(logic) == 1)
            key = list(logic.keys())[0]
            val = logic[key]
            val = [val] if not isinstance(val, list) else val
            if key == 'and':
                and_result = list(reduce(self.and_, (self.prase_logic(block) for block in val)))
                return self.clean_selections(and_result)
            elif key == 'or':
                or_result = list(reduce(self.or_, (self.prase_logic(block) for block in val)))
                return self.clean_selections(or_result)
            elif key == 'not':
                and_result = list(reduce(self.and_, (self.prase_logic(block) for block in val)))
                not_val = list(reduce(self.and_, [self.inverse_selection(selection) for selection in and_result]))
                return self.clean_selections(not_val)
        elif isinstance(logic, list):
            list_val = list(reduce(self.and_, (self.prase_logic(block) for block in logic)))
            return self.clean_selections(list_val)
        elif isinstance(logic, str):
            para, option = self._get_para_option(logic)
            return [{para: {option}}]
        raise ValueError(f"Logic {logic} is invalid")

    def retrieve_logic(self, selections: list[dict[str, set[str]]]):
        """
        return back the logic from the selections
        """
        outer_dict = {'or': []}
        for selection in selections:
            inner_dict = {'and': []}
            for key, values in selection.items():
                if len(values) > 1:
                    inner_dict['and'].append({'or': [f"{key}|{value}" for value in values]})
                elif len(values) == 1:
                    inner_dict['and'].append(f"{key}|{list(values)[0]}")
                else:
                    raise ValueError(f"Selection {selection} has no valid value for {key}")
            if len(inner_dict['and']) == 1:
                outer_dict['or'].append(inner_dict['and'][0])
            else:
                outer_dict['or'].append(inner_dict)
        if len(outer_dict['or']) == 1:
            return outer_dict['or'][0]
        return outer_dict

    def normalize_logic(self, logic):
        selections = self.prase_logic(logic)
        return self.retrieve_logic(selections)

    def get_overlapping_selections(self, logic1, logic2):
        """
        Get the selections that are common to both logic1 and logic2
        """
        selections = self.prase_logic({"and": [logic1, logic2]})
        if len(selections) == 0:
            return None
        selections = self.clean_selections(selections)
        return self.retrieve_logic(selections)

Classes

class LogicParser (opt_sat_path, yaml_file)
Expand source code
class LogicParser:
    def __init__(self, opt_sat_path, yaml_file) -> None:
        opt_df = read_csv(opt_sat_path)
        opt_df = opt_df[opt_df["Saturation"] > 0]
        self.available_opts = opt_df.groupby("Parameter")['Option'].agg(set).to_dict()
        self.yaml_file = yaml_file

    def get_cfg(self) -> dict:
        """Get the buildstock configuration file as a dictionary object.

        Returns:
            dict: The buildstock configuration file.
        """
        with open(self.yaml_file) as f:
            config = yaml.load(f, Loader=yaml.SafeLoader)
        return config

    def get_apply_logics(self, upgrade_num, option_name):
        """Get the apply logic for a given upgrade number and option.

        Args:
            upgrade_num (int): The upgrade number.
            option_name (str): The option name.

        Returns:
            dict: The apply logic for the given upgrade number and option name.
        """
        config = self.get_cfg()
        upgrade = config["upgrades"][upgrade_num - 1]
        opt2logic = dict()
        for opt in upgrade["options"]:
            para, _ = self._get_para_option(opt["option"])
            if para == option_name:
                if "apply_logic" in opt and "package_apply_logic" in upgrade:
                    logic = {"and": [opt["apply_logic"], upgrade["package_apply_logic"]]}
                elif "apply_logic" in opt:
                    logic = opt["apply_logic"]
                else:
                    logic = upgrade["package_apply_logic"]
                if opt["option"] in opt2logic:
                    opt2logic[opt["option"]] = {"or": [opt2logic[opt["option"]], logic]}
                else:
                    opt2logic[opt["option"]] = logic
        return opt2logic

    def get_upgrade_options_map(self):
        """Get list of all options for all upgrades"""
        config = self.get_cfg()
        upgrade_options_map = defaultdict(set)
        for upgrade_num, upgrade in enumerate(config["upgrades"], start=1):
            for opt in upgrade["options"]:
                para, _ = self._get_para_option(opt["option"])
                upgrade_options_map[(upgrade_num, upgrade['upgrade_name'])].add(para)
        return upgrade_options_map

    def get_overlap_report(self):
        """
        For all the option in each upgrade, verify that there are no overlapping selection.
        """
        upgrade_options_map = self.get_upgrade_options_map()
        overlap_report = ""
        for (upgrade_num, upgrade_name), options in upgrade_options_map.items():
            for option in options:
                print(f"Verifying upgrade {upgrade_num} ({upgrade_name}), {option}")
                opt2logic = self.get_apply_logics(upgrade_num, option)
                for opt1, opt2 in it.combinations(opt2logic.keys(), 2):
                    overlap = self.get_overlapping_selections(opt2logic[opt1], opt2logic[opt2])
                    if overlap:
                        overlap_report += f"Upgrade {upgrade_num} ({upgrade_name}),\
                                            has overlapping selections for {opt1} and {opt2}:\n"
                        overlap_report += json.dumps(overlap, indent=2) + "\n"
        return overlap_report

    @staticmethod
    def _get_para_option(condition):
        try:
            para, option = condition.split("|")
        except ValueError as e:
            raise ValueError(f"Condition {condition} is invalid") from e
        return para, option

    @staticmethod
    def and_dicts(dict1: dict[str, set[str]], dict2: dict[str, set[str]]):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        # l1 = {'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o5'}, 'p5': {'o9', 'o10'}}
        # l2 = {'p1': {'o3', 'o4'}, 'p2': {'o5', 'o6'}, 'p9': {'o11', 'o12'}}
        # l1 & l2 = {'p1': {'o3'}, 'p2': {'o5'}, 'p5': {'o9', 'o10'}, 'p9': {'o11', 'o12'}}
        new_dict: dict[str, set[str]] = {}
        for key in sorted(set(dict1.keys()) | set(dict2.keys())):
            if key in dict1 and key in dict2:
                new_dict[key] = dict1[key].intersection(dict2[key])
            elif key in dict1:
                new_dict[key] = dict1[key]
            else:
                new_dict[key] = dict2[key]
        return new_dict

    @staticmethod
    def and_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]]):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        new_logic: list[dict[str, set[str]]] = []
        for dict1, dict2 in it.product(selections1, selections2):
            new_logic.append(LogicParser.and_dicts(dict1, dict2))
        return new_logic

    @staticmethod
    def _trim_selections(selections: list[dict[str, set[str]]]):
        """
        Remove any selections that are subsets of another selection or contains
        a key with no available values
        """
        new_selections = []
        keys2seen_selections: dict[tuple, list[dict[str, set[str]]]] = dict()

        def is_subset(sel):
            sel_keys = tuple(sorted(sel.keys()))
            keys_combi = (keys for count in range(1, len(sel_keys) + 1) for keys in it.combinations(sel_keys, count))
            for keys in keys_combi:
                if keys in keys2seen_selections:
                    for seen_selection in keys2seen_selections[keys]:
                        if all(sel[key] <= seen_selection[key] for key in keys):
                            return True
            return False

        for selection in sorted(selections, key=lambda x: len(x)):
            if any(len(selection[key]) == 0 for key in selection):
                continue
            if not is_subset(selection):
                keys2seen_selections[tuple(sorted(selection.keys()))] = [selection]
                new_selections.append(selection)
        return new_selections

    @staticmethod
    def _compress_selections(selections: list[dict[str, set[str]]]):
        """
        If there are multiple selections with the same keys, and same values except for one key,
        merge them into one selection to reduce the number of selections.
        For example: [{'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o5'}},
                      {'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o6'}}] will be merged into
                     [{'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o5', 'o6'}}]
        """
        keys2seen_selections: dict[tuple, list[dict[str, set[str]]]] = dict()
        for sel_dict in selections:
            keys = tuple(sorted(sel_dict.keys()))
            if keys in keys2seen_selections:
                for seen_selection in keys2seen_selections[keys]:
                    matching_keys = {key for key in keys if seen_selection[key] == sel_dict[key]}
                    if len(matching_keys) == len(keys):
                        # If all keys match, we have a duplicate sel_dict
                        break
                    elif len(matching_keys) == len(keys) - 1:
                        # sel_dict has one key with different value from before. Simply update the record
                        key = tuple(set(keys) - matching_keys)[0]
                        seen_selection[key] |= sel_dict[key]
                        break
                else:
                    # If we did not break out of the loop, we have a new selection that can't be merged with
                    # any of the previous selections
                    keys2seen_selections[keys].append(sel_dict)
            else:
                keys2seen_selections[keys] = [sel_dict]
        return list(selection for seen_selections in keys2seen_selections.values() for selection in seen_selections)

    @staticmethod
    def clean_selections(selections: list[dict[str, set[str]]]):
        """
        Repeatedly compress and trim selections until there is no change.
        """
        while True:
            new_selections = LogicParser._compress_selections(LogicParser._trim_selections(selections))
            if len(new_selections) == len(selections):
                return new_selections
            selections = new_selections

    @staticmethod
    def or_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]]):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        return selections1 + selections2

    def inverse_selection(self, selection: dict):
        return list({k: self.available_opts[k] - v - {"Void"}} for k, v in selection.items())

    def not_(self, logic1):
        """
        Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
        """
        prarsed_logic = self.prase_logic(logic1)
        return_val = list(reduce(self.and_, [self.inverse_selection(selection) for selection in prarsed_logic]))
        return return_val

    def _normalize_lists(self, logic, parent=None):
        """Any list that is not in a or block is considered to be in an and block.
            This block will normalize this pattern by adding "and" wherever required.
        Args:
            logic (_type_): Logic structure (dict, list etc)
            parent (_type_, optional): The parent of the current logic block. If it is a list, and there is no parent,
            the list will be wrapped in a and block.

        Returns:
            _type_: _description_
        """
        if isinstance(logic, list):
            # If it is a single element list, just unwrap and return
            if len(logic) == 1:
                return self._normalize_lists(logic[0])
            new_logic = [self._normalize_lists(el) for el in logic]
            return {"and": new_logic} if parent is None else new_logic
        elif isinstance(logic, dict):
            new_dict = {key: self._normalize_lists(value, parent=key) for key, value in logic.items()}
            return new_dict
        else:
            return logic

    def prase_logic(self, logic):
        """
        Convert the parameter|option logic in the yaml file into the format the apply upgrade measure understands.

        :param logic: dict, list, or string with downselection logic in it
        :returns: str of logic
        """
        logic = self._normalize_lists(logic)
        if isinstance(logic, dict):
            assert (len(logic) == 1)
            key = list(logic.keys())[0]
            val = logic[key]
            val = [val] if not isinstance(val, list) else val
            if key == 'and':
                and_result = list(reduce(self.and_, (self.prase_logic(block) for block in val)))
                return self.clean_selections(and_result)
            elif key == 'or':
                or_result = list(reduce(self.or_, (self.prase_logic(block) for block in val)))
                return self.clean_selections(or_result)
            elif key == 'not':
                and_result = list(reduce(self.and_, (self.prase_logic(block) for block in val)))
                not_val = list(reduce(self.and_, [self.inverse_selection(selection) for selection in and_result]))
                return self.clean_selections(not_val)
        elif isinstance(logic, list):
            list_val = list(reduce(self.and_, (self.prase_logic(block) for block in logic)))
            return self.clean_selections(list_val)
        elif isinstance(logic, str):
            para, option = self._get_para_option(logic)
            return [{para: {option}}]
        raise ValueError(f"Logic {logic} is invalid")

    def retrieve_logic(self, selections: list[dict[str, set[str]]]):
        """
        return back the logic from the selections
        """
        outer_dict = {'or': []}
        for selection in selections:
            inner_dict = {'and': []}
            for key, values in selection.items():
                if len(values) > 1:
                    inner_dict['and'].append({'or': [f"{key}|{value}" for value in values]})
                elif len(values) == 1:
                    inner_dict['and'].append(f"{key}|{list(values)[0]}")
                else:
                    raise ValueError(f"Selection {selection} has no valid value for {key}")
            if len(inner_dict['and']) == 1:
                outer_dict['or'].append(inner_dict['and'][0])
            else:
                outer_dict['or'].append(inner_dict)
        if len(outer_dict['or']) == 1:
            return outer_dict['or'][0]
        return outer_dict

    def normalize_logic(self, logic):
        selections = self.prase_logic(logic)
        return self.retrieve_logic(selections)

    def get_overlapping_selections(self, logic1, logic2):
        """
        Get the selections that are common to both logic1 and logic2
        """
        selections = self.prase_logic({"and": [logic1, logic2]})
        if len(selections) == 0:
            return None
        selections = self.clean_selections(selections)
        return self.retrieve_logic(selections)

Static methods

def and_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]])

Merge two dicts. If there is a conflict, the value in dict2 takes precedence.

Expand source code
@staticmethod
def and_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]]):
    """
    Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
    """
    new_logic: list[dict[str, set[str]]] = []
    for dict1, dict2 in it.product(selections1, selections2):
        new_logic.append(LogicParser.and_dicts(dict1, dict2))
    return new_logic
def and_dicts(dict1: dict[str, set[str]], dict2: dict[str, set[str]])

Merge two dicts. If there is a conflict, the value in dict2 takes precedence.

Expand source code
@staticmethod
def and_dicts(dict1: dict[str, set[str]], dict2: dict[str, set[str]]):
    """
    Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
    """
    # l1 = {'p1': {'o1', 'o2', 'o3'}, 'p2': {'o4', 'o5'}, 'p5': {'o9', 'o10'}}
    # l2 = {'p1': {'o3', 'o4'}, 'p2': {'o5', 'o6'}, 'p9': {'o11', 'o12'}}
    # l1 & l2 = {'p1': {'o3'}, 'p2': {'o5'}, 'p5': {'o9', 'o10'}, 'p9': {'o11', 'o12'}}
    new_dict: dict[str, set[str]] = {}
    for key in sorted(set(dict1.keys()) | set(dict2.keys())):
        if key in dict1 and key in dict2:
            new_dict[key] = dict1[key].intersection(dict2[key])
        elif key in dict1:
            new_dict[key] = dict1[key]
        else:
            new_dict[key] = dict2[key]
    return new_dict
def clean_selections(selections: list[dict[str, set[str]]])

Repeatedly compress and trim selections until there is no change.

Expand source code
@staticmethod
def clean_selections(selections: list[dict[str, set[str]]]):
    """
    Repeatedly compress and trim selections until there is no change.
    """
    while True:
        new_selections = LogicParser._compress_selections(LogicParser._trim_selections(selections))
        if len(new_selections) == len(selections):
            return new_selections
        selections = new_selections
def or_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]])

Merge two dicts. If there is a conflict, the value in dict2 takes precedence.

Expand source code
@staticmethod
def or_(selections1: list[dict[str, set[str]]], selections2: list[dict[str, set[str]]]):
    """
    Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
    """
    return selections1 + selections2

Methods

def get_apply_logics(self, upgrade_num, option_name)

Get the apply logic for a given upgrade number and option.

Args

upgrade_num : int
The upgrade number.
option_name : str
The option name.

Returns

dict
The apply logic for the given upgrade number and option name.
Expand source code
def get_apply_logics(self, upgrade_num, option_name):
    """Get the apply logic for a given upgrade number and option.

    Args:
        upgrade_num (int): The upgrade number.
        option_name (str): The option name.

    Returns:
        dict: The apply logic for the given upgrade number and option name.
    """
    config = self.get_cfg()
    upgrade = config["upgrades"][upgrade_num - 1]
    opt2logic = dict()
    for opt in upgrade["options"]:
        para, _ = self._get_para_option(opt["option"])
        if para == option_name:
            if "apply_logic" in opt and "package_apply_logic" in upgrade:
                logic = {"and": [opt["apply_logic"], upgrade["package_apply_logic"]]}
            elif "apply_logic" in opt:
                logic = opt["apply_logic"]
            else:
                logic = upgrade["package_apply_logic"]
            if opt["option"] in opt2logic:
                opt2logic[opt["option"]] = {"or": [opt2logic[opt["option"]], logic]}
            else:
                opt2logic[opt["option"]] = logic
    return opt2logic
def get_cfg(self) ‑> dict

Get the buildstock configuration file as a dictionary object.

Returns

dict
The buildstock configuration file.
Expand source code
def get_cfg(self) -> dict:
    """Get the buildstock configuration file as a dictionary object.

    Returns:
        dict: The buildstock configuration file.
    """
    with open(self.yaml_file) as f:
        config = yaml.load(f, Loader=yaml.SafeLoader)
    return config
def get_overlap_report(self)

For all the option in each upgrade, verify that there are no overlapping selection.

Expand source code
def get_overlap_report(self):
    """
    For all the option in each upgrade, verify that there are no overlapping selection.
    """
    upgrade_options_map = self.get_upgrade_options_map()
    overlap_report = ""
    for (upgrade_num, upgrade_name), options in upgrade_options_map.items():
        for option in options:
            print(f"Verifying upgrade {upgrade_num} ({upgrade_name}), {option}")
            opt2logic = self.get_apply_logics(upgrade_num, option)
            for opt1, opt2 in it.combinations(opt2logic.keys(), 2):
                overlap = self.get_overlapping_selections(opt2logic[opt1], opt2logic[opt2])
                if overlap:
                    overlap_report += f"Upgrade {upgrade_num} ({upgrade_name}),\
                                        has overlapping selections for {opt1} and {opt2}:\n"
                    overlap_report += json.dumps(overlap, indent=2) + "\n"
    return overlap_report
def get_overlapping_selections(self, logic1, logic2)

Get the selections that are common to both logic1 and logic2

Expand source code
def get_overlapping_selections(self, logic1, logic2):
    """
    Get the selections that are common to both logic1 and logic2
    """
    selections = self.prase_logic({"and": [logic1, logic2]})
    if len(selections) == 0:
        return None
    selections = self.clean_selections(selections)
    return self.retrieve_logic(selections)
def get_upgrade_options_map(self)

Get list of all options for all upgrades

Expand source code
def get_upgrade_options_map(self):
    """Get list of all options for all upgrades"""
    config = self.get_cfg()
    upgrade_options_map = defaultdict(set)
    for upgrade_num, upgrade in enumerate(config["upgrades"], start=1):
        for opt in upgrade["options"]:
            para, _ = self._get_para_option(opt["option"])
            upgrade_options_map[(upgrade_num, upgrade['upgrade_name'])].add(para)
    return upgrade_options_map
def inverse_selection(self, selection: dict)
Expand source code
def inverse_selection(self, selection: dict):
    return list({k: self.available_opts[k] - v - {"Void"}} for k, v in selection.items())
def normalize_logic(self, logic)
Expand source code
def normalize_logic(self, logic):
    selections = self.prase_logic(logic)
    return self.retrieve_logic(selections)
def not_(self, logic1)

Merge two dicts. If there is a conflict, the value in dict2 takes precedence.

Expand source code
def not_(self, logic1):
    """
    Merge two dicts. If there is a conflict, the value in dict2 takes precedence.
    """
    prarsed_logic = self.prase_logic(logic1)
    return_val = list(reduce(self.and_, [self.inverse_selection(selection) for selection in prarsed_logic]))
    return return_val
def prase_logic(self, logic)

Convert the parameter|option logic in the yaml file into the format the apply upgrade measure understands.

:param logic: dict, list, or string with downselection logic in it :returns: str of logic

Expand source code
def prase_logic(self, logic):
    """
    Convert the parameter|option logic in the yaml file into the format the apply upgrade measure understands.

    :param logic: dict, list, or string with downselection logic in it
    :returns: str of logic
    """
    logic = self._normalize_lists(logic)
    if isinstance(logic, dict):
        assert (len(logic) == 1)
        key = list(logic.keys())[0]
        val = logic[key]
        val = [val] if not isinstance(val, list) else val
        if key == 'and':
            and_result = list(reduce(self.and_, (self.prase_logic(block) for block in val)))
            return self.clean_selections(and_result)
        elif key == 'or':
            or_result = list(reduce(self.or_, (self.prase_logic(block) for block in val)))
            return self.clean_selections(or_result)
        elif key == 'not':
            and_result = list(reduce(self.and_, (self.prase_logic(block) for block in val)))
            not_val = list(reduce(self.and_, [self.inverse_selection(selection) for selection in and_result]))
            return self.clean_selections(not_val)
    elif isinstance(logic, list):
        list_val = list(reduce(self.and_, (self.prase_logic(block) for block in logic)))
        return self.clean_selections(list_val)
    elif isinstance(logic, str):
        para, option = self._get_para_option(logic)
        return [{para: {option}}]
    raise ValueError(f"Logic {logic} is invalid")
def retrieve_logic(self, selections: list[dict[str, set[str]]])

return back the logic from the selections

Expand source code
def retrieve_logic(self, selections: list[dict[str, set[str]]]):
    """
    return back the logic from the selections
    """
    outer_dict = {'or': []}
    for selection in selections:
        inner_dict = {'and': []}
        for key, values in selection.items():
            if len(values) > 1:
                inner_dict['and'].append({'or': [f"{key}|{value}" for value in values]})
            elif len(values) == 1:
                inner_dict['and'].append(f"{key}|{list(values)[0]}")
            else:
                raise ValueError(f"Selection {selection} has no valid value for {key}")
        if len(inner_dict['and']) == 1:
            outer_dict['or'].append(inner_dict['and'][0])
        else:
            outer_dict['or'].append(inner_dict)
    if len(outer_dict['or']) == 1:
        return outer_dict['or'][0]
    return outer_dict