Source code for buildingmotif.template_matcher

"""
Using the VF2 algorithm to compute subgraph isomorphisms between a template T and a graph G.
If the found isomorphism is a subgraph of T, then T is not fully matched and additional
input is required to fully populate the template.
"""
from collections import defaultdict
from itertools import combinations, permutations
from typing import TYPE_CHECKING, Callable, Dict, Generator, List, Optional, Set, Tuple

import networkx as nx  # type: ignore
from networkx.algorithms.isomorphism import DiGraphMatcher  # type: ignore
from rdflib import Graph, URIRef
from rdflib.extras.external_graph_libs import rdflib_to_networkx_digraph
from rdflib.term import Node

from buildingmotif.namespaces import OWL, PARAM, RDF, RDFS
from buildingmotif.utils import copy_graph

if TYPE_CHECKING:
    from buildingmotif.dataclasses.template import Template

Mapping = Dict[Node, Node]


# used to accelerate monomorphism search
# outer key is the address of the ontology graph
# inner map is from a class to its parent classes
class _ontology_lookup_cache:
    sc_cache: Dict[int, Dict[Node, Set[Node]]]
    t_cache: Dict[int, Dict[Node, Set[Node]]]
    in_cache: Dict[int, Dict[Node, bool]]

    def __init__(self):
        self.sc_cache = {}
        self.sp_cache = {}
        self.t_cache = {}
        self.in_cache = {}

    def parents(self, ntype: Node, ontology: Graph) -> Set[Node]:
        if id(ontology) not in self.sc_cache:
            self.sc_cache[id(ontology)] = {}
        cache = self.sc_cache[id(ontology)]
        # populate cache if necessary
        if ntype not in cache:
            cache[ntype] = set(ontology.transitive_objects(ntype, RDFS.subClassOf))  # type: ignore
        return cache[ntype]

    def superproperties(self, ntype: Node, ontology: Graph) -> Set[Node]:
        if id(ontology) not in self.sp_cache:
            self.sp_cache[id(ontology)] = {}
        cache = self.sp_cache[id(ontology)]
        # populate cache if necessary
        if ntype not in cache:
            cache[ntype] = set(ontology.transitive_objects(ntype, RDFS.subPropertyOf))
        return cache[ntype]

    def types(self, node: Node, graph: Graph) -> Set[URIRef]:
        if id(graph) not in self.t_cache:
            self.t_cache[id(graph)] = {}
        cache = self.t_cache[id(graph)]
        # populate cache if necessary
        if node not in cache:
            cache[node] = set(graph.objects(node, RDF.type))  # type: ignore
        if len(cache[node]) == 0:
            cache[node] = {OWL.NamedIndividual}
        return cache[node]  # type: ignore

    def defined_in(self, node: Node, graph: Graph) -> bool:
        if id(graph) not in self.in_cache:
            self.in_cache[id(graph)] = {}
        cache = self.in_cache[id(graph)]
        # populate cache if necessary
        if node not in cache:
            cache[node] = (node, RDF.type, OWL.Class) in graph
        return cache[node]


def _get_types(n: Node, g: Graph, _cache: _ontology_lookup_cache) -> Set[URIRef]:
    """
    Types for a node should only be URIRefs, so the
    type filtering here should be safe.
    Defaults to owl:NamedIndividual as a "root" type
    """
    return _cache.types(n, g)


def _compatible_types(
    n1: Node,
    g1: Graph,
    n2: Node,
    g2: Graph,
    ontology: Graph,
    _cache: _ontology_lookup_cache,
) -> bool:
    """
    Returns true if the two terms have covariant types.

    :param n1: First node
    :type n1: Node
    :param g1: The graph containing the first node's types
    :type g1: Graph
    :param n2: Second node
    :type n2: Node
    :param g2: The graph containing the second node's types
    :type g2: Graph
    :param ontology: The ontology graph that defines the class hierarchy
    :type ontology: Graph
    :return: True if the nodes are semantically compatible, false otherwise
    :rtype: bool
    """
    n1types = _get_types(n1, g1, _cache)
    n2types = _get_types(n2, g2, _cache)
    # check if these are properties; if so, use subPropertyOf, not subClassOf
    property_types = {OWL.ObjectProperty, OWL.DatatypeProperty}
    if property_types.intersection(n1types) and property_types.intersection(n2types):
        for n1type in _get_types(n1, g1, _cache):
            for n2type in _get_types(n2, g2, _cache):

                # check if types are covariant
                if n2type in _cache.superproperties(
                    n1type, ontology
                ) or n1type in _cache.superproperties(n2type, ontology):
                    return True
    else:
        for n1type in _get_types(n1, g1, _cache):
            for n2type in _get_types(n2, g2, _cache):

                # check if types are covariant
                if n2type in _cache.parents(
                    n1type, ontology
                ) or n1type in _cache.parents(n2type, ontology):
                    return True
    return False


[docs]def get_semantic_feasibility( G1: Graph, G2: Graph, ontology: Graph, _cache: _ontology_lookup_cache ) -> Callable[[Node, Node], bool]: """Returns a function that checks if two nodes are semantically feasible to be matched given the information in the provided ontology. The function returns true if the two nodes are semantically feasible to be matched. We use the following checks: 1. If the two nodes are both classes, one must be a subclass of the other. 2. If the two nodes are instances, they must be of the same class. :param G1: graph 1 :type G1: Graph :param G2: graph 2 :type G2: Graph :param ontology: ontology graph :type ontology: Graph :param _cache: ontology lookup cache :type _cache: _ontology_lookup_cache :return: function that checks if two nodes are semantically feasible :rtype: Callable[[Node, Node], bool] """ # TODO: other checks? def semantic_feasibility(n1: Node, n2: Node) -> bool: """Checks if two nodes are semantically feasible to be matched. :param n1: node 1 :type n1: Node :param n2: node 2 :type n2: Node :return: true if nodes are feasible, false if not :rtype: bool """ # case 0: same node if n1 == n2: return True # case 1: both are classes if _cache.defined_in(n1, ontology) and _cache.defined_in(n2, ontology): if n2 in _cache.parents(n1, ontology): return True elif n1 in _cache.parents(n2, ontology): return True else: return False # case 2: both are instances if _compatible_types(n1, G1, n2, G2, ontology, _cache): return True return False return semantic_feasibility
class _VF2SemanticMatcher(DiGraphMatcher): """ A subclass of DiGraphMatcher that incorporates semantic feasibility into the matching process using the provided ontology. :param T: template graph :param G: building graph :param ontology: ontology that contains the information about node semantics """ def __init__(self, T: Graph, G: Graph, ontology: Graph): super().__init__(rdflib_to_networkx_digraph(T), rdflib_to_networkx_digraph(G)) self.ontology = ontology self._cache = _ontology_lookup_cache() self._semantic_feasibility = get_semantic_feasibility( T, G, ontology, self._cache ) def semantic_feasibility(self, g1: Node, g2: Node) -> bool: """ Returns true if the two nodes are semantically feasible to be matched. """ val = self._semantic_feasibility(g1, g2) return val
[docs]def generate_all_subgraphs(T: Graph) -> Generator[Graph, None, None]: """Generates all node-induced subgraphs of T in order of decreasing size. We generate subgraphs in decreasing order of size because we want to find the largest subgraph as part of the monomorphism search process. :param T: template graph :type T: Graph :yield: subgraphs :rtype: Generator[Graph, None, None] """ # no monomorphism will be larger than the number of distinct nodes in the graph largest_sg_size = len(T.all_nodes()) for nodecount in range(largest_sg_size, 1, -1): for nodelist in combinations(T.all_nodes(), nodecount): yield digraph_to_rdflib(rdflib_to_networkx_digraph(T).subgraph(nodelist))
[docs]def digraph_to_rdflib(digraph: nx.DiGraph) -> Graph: """Turns a `nx.DiGraph` into an `rdflib.Graph`. Expects the nx.DiGraph to have been produced by :py:func:`rdflib_to_networkx_digraph`. :param digraph: directed graph :type digraph: nx.DiGraph :return: RDF graph :rtype: Graph """ g = Graph() for s, o, pdict in digraph.edges(data=True): g.add((s, pdict["triples"][0][1], o)) return g
[docs]class TemplateMatcher: """Computes the set of subgraphs of G that are monomorphic to T; these are organized by how "complete" the monomorphism is. """ mappings: Dict[int, List[Mapping]] template: "Template" building: Graph template_bindings: Dict[str, Node] template_graph: Graph def __init__( self, building: Graph, template: "Template", ontology: Graph, graph_target: Optional[Node] = None, ): self.mappings = defaultdict(list) self.template_bindings = {} self.template = template self.building = building self.ontology = ontology self.graph_target = graph_target self.template_graph = copy_graph(template.body) self.template_parameters: Set[Node] = { PARAM[p] for p in self.template.parameters } self._generate_mappings() def _generate_mappings(self): for template_subgraph in generate_all_subgraphs(self.template_graph): matching = _VF2SemanticMatcher( self.building, template_subgraph, self.ontology ) if matching.subgraph_is_monomorphic(): for sg in matching.subgraph_monomorphisms_iter(): # skip if the subgraph does not contain the graph node we care about if self.graph_target and self.graph_target not in sg.keys(): continue # sg is a mapping from building graph nodes to template nodes # that constitutes a monomorphism. if len(sg) == 0: continue # TODO: Limit mappings to those that include all of the params? # TODO: ignore optional parameters? # TODO: require that 'name' is in the parameters? # if there is an overlap, yield the subgraph if set(sg.values()).intersection(self.template_parameters): self.add_mapping(sg)
[docs] def add_mapping(self, mapping: Mapping): """Adds a mapping to the set of mappings. :param mapping: mapping :type mapping: Mapping """ if mapping not in self.mappings[len(mapping)]: self.mappings[len(mapping)].append(mapping)
@property def largest_mapping_size(self) -> int: """Returns the size of the largest mapping. :return: size of largest mapping :rtype: int """ return max(self.mappings.keys())
[docs] def building_subgraph_from_mapping(self, mapping: Mapping) -> Graph: """Returns the subgraph of the building graph that corresponds to the given mapping. :param mapping: mapping :type mapping: Mapping :return: subgraph :rtype: Graph """ g = rdflib_to_networkx_digraph(self.building) edges = permutations(mapping.keys(), 2) sg = g.edge_subgraph(edges) return digraph_to_rdflib(sg)
[docs] def template_subgraph_from_mapping(self, mapping: Mapping) -> Graph: """Returns the subgraph of the template graph that corresponds to the given mapping. :param mapping: mapping :type mapping: Mapping :return: subgraph :rtype: Graph """ # TODO: need to keep the edges that are more generic than what we have inside the graph. # For example, if the building has (x a brick:AHU) then we don't need to remind them to # add an edge (x a brick:Equipment) because that is redundant g = rdflib_to_networkx_digraph(self.template_graph) return digraph_to_rdflib(g.subgraph(mapping.values()))
[docs] def remaining_template_graph(self, mapping: Mapping) -> Graph: """Returns the remaining template graph to be filled out given a mapping. :param mapping: mapping :type mapping: Mapping :return: remaining template graph to be filled out :rtype: Graph """ sg = self.template_subgraph_from_mapping(mapping) return self.template_graph - sg
[docs] def remaining_template(self, mapping: Mapping) -> Optional["Template"]: """Returns the remaining template to be filled out given a mapping. :param mapping: mapping :type mapping: Mapping :return: remaining template to be filled out :rtype: Optional[Template] """ # if all parameters are fulfilled by the mapping, then return None mapping = {k: v for k, v in mapping.items() if str(v) in PARAM} mapped_params: Set[Node] = {v for v in mapping.values()} if not self.template_parameters - mapped_params: # return self.building_subgraph_from_mapping(mapping) return None bindings = {} for building_node, param in mapping.items(): if param is not None: bindings[str(param)[len(PARAM) :]] = building_node # this *should* be a template because we don't have bindings for all of # the template's parameters res = self.template.evaluate(bindings) assert not isinstance(res, Graph) return res
[docs] def mappings_iter(self, size=None) -> Generator[Mapping, None, None]: """Returns an iterator over all of the mappings of the given size. If size is None, then all mappings are returned in descending order of the size of the mapping. This means the most complete mappings will be returned first. :param size: size, defaults to None :type size: int, optional :yield: mapping iterator :rtype: Generator[Mapping, None, None] """ if size is None: for size in sorted(self.mappings.keys(), reverse=True): for mapping in self.mappings[size]: if not mapping: continue yield mapping else: for mapping in self.mappings[size]: if not mapping: continue yield mapping
[docs] def building_mapping_subgraphs_iter( self, size=None, ) -> Generator[Tuple[Mapping, Graph], None, None]: """Returns an iterator over all of the subgraphs with a mapping of the given size. If size is None, then all mappings are returned in descending order of the size of the mapping. This means the most complete subgraphs will be returned first. :param size: size, defaults to None :type size: int, optional :yield: mapping and subgraph iterator :rtype: Generator[Tuple[Mapping, Graph], None, None] """ cache = set() for mapping in self.mappings_iter(size): subgraph = self.building_subgraph_from_mapping(mapping) if not subgraph.connected(): continue key = tuple(sorted(subgraph.all_nodes())) # type: ignore if key in cache: continue cache.add(key) yield mapping, subgraph