Source code for buildingmotif.dataclasses.shape_collection

import logging
import random
import string
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union

import rdflib
from pyshacl.helper.path_helper import shacl_path_to_sparql_path
from rdflib import RDF, RDFS, Graph, URIRef
from rdflib.paths import ZeroOrMore, ZeroOrOne
from rdflib.term import Node

from buildingmotif import get_building_motif
from buildingmotif.namespaces import BMOTIF, OWL, SH
from buildingmotif.utils import Triple, copy_graph

if TYPE_CHECKING:
    from buildingmotif import BuildingMOTIF

ONTOLOGY_FILE = (
    Path(__file__).resolve().parents[1] / "resources" / "building_motif_ontology.ttl"
)
ontology = rdflib.Graph().parse(ONTOLOGY_FILE)


[docs]@dataclass class ShapeCollection: """This class mirrors :py:class:`database.tables.DBShapeCollection`.""" _id: int graph: rdflib.Graph _bm: "BuildingMOTIF"
[docs] @classmethod def create(cls) -> "ShapeCollection": """Create a new ShapeCollection. :return: new ShapeCollection :rtype: ShapeCollection """ bm = get_building_motif() db_shape_collection = bm.table_connection.create_db_shape_collection() graph = bm.graph_connection.create_graph( db_shape_collection.graph_id, rdflib.Graph() ) return cls(_id=db_shape_collection.id, graph=graph, _bm=bm)
[docs] @classmethod def load(cls, id: int) -> "ShapeCollection": """Get ShapeCollection from database by id. :param id: ShapeCollection id :type id: int :return: ShapeCollection :rtype: ShapeCollection """ bm = get_building_motif() db_shape_collection = bm.table_connection.get_db_shape_collection(id) graph = bm.graph_connection.get_graph(db_shape_collection.graph_id) return cls(_id=db_shape_collection.id, graph=graph, _bm=bm)
@property def id(self) -> Optional[int]: return self._id @id.setter def id(self, new_id): raise AttributeError("Cannot modify db id") @property def graph_name(self) -> Optional[URIRef]: """ Returns the name of the graph (subject of "a owl:Ontology") if one exists """ # will be None if this is not found return self.graph.value(predicate=RDF.type, object=OWL.Ontology) # type: ignore
[docs] def add_triples(self, *triples: Triple) -> None: """Add the given triples to the graph. :param triples: a sequence of triples to add to the graph :type triples: Triple """ for triple in triples: self.graph.add(triple)
[docs] def add_graph(self, graph: rdflib.Graph) -> None: """Add the given graph to the ShapeCollection. :param graph: the graph to add to the ShapeCollection :type graph: rdflib.Graph """ self.graph += graph
def _cbd(self, shape_name, self_contained=True): """Retrieves the Concise Bounded Description (CBD) of the shape.""" cbd = self.graph.cbd(shape_name) # if computing self-contained, do the fixed-point computation produced by unioning # the CBDs of all nodes in the current CBD until the graph does not change changed = True while self_contained and changed: new_g = rdflib.Graph() for node in cbd.all_nodes(): new_g += self.graph.cbd(node) new_cbd = new_g + cbd changed = len(new_cbd) > cbd cbd = new_cbd return cbd
[docs] def resolve_imports( self, recursive_limit: int = -1, error_on_missing_imports: bool = True ) -> "ShapeCollection": """Resolves `owl:imports` to as many levels as requested. By default, all `owl:imports` are recursively resolved. This limit can be changed to 0 to suppress resolving imports, or to 1..n to handle recursion up to that limit. :param recursive_limit: how many levels of `owl:imports` to resolve, defaults to -1 (all) :type recursive_limit: int, optional :param error_on_missing_imports: if True, raises an error if any of the dependency ontologies are missing (i.e. they need to be loaded into BuildingMOTIF), defaults to True :type error_on_missing_imports: bool, optional :return: a new ShapeCollection with the types resolved :rtype: ShapeCollection """ resolved_namespaces: Set[rdflib.URIRef] = set() resolved = _resolve_imports( self.graph, recursive_limit, resolved_namespaces, error_on_missing_imports=error_on_missing_imports, ) new_sc = ShapeCollection.create() new_sc.add_graph(resolved) return new_sc
@classmethod def _get_subclasses_of_definition_type( cls, definition_type: URIRef ) -> List[URIRef]: """Get all the definition types in the ontology that are subclasses in the given definition types. :param definition_type: the given definition type :type definition_type: URIRef :return: list of included definition types :rtype: List[URIRef] """ children = ontology.subjects(RDFS.subClassOf, definition_type) results = [definition_type] for child in children: results += cls._get_subclasses_of_definition_type(child) return results @classmethod def _get_included_domains(cls, domain: URIRef) -> List[URIRef]: """Get all the domains in the ontology that are included in the given domains. :param domain: the given domain :type domain: URIRef :return: list of included domains :rtype: List[URIRef] """ children = ontology.subjects(BMOTIF.includes, domain) results = [domain] for child in children: results += cls._get_included_domains(child) return results
[docs] def get_shapes_of_definition_type( self, definition_type: URIRef, include_labels=False ) -> Union[List[URIRef], List[Tuple[URIRef, str]]]: """Get subjects present in shape of the definition type. :param definition_type: desired definition type :type definition_type: URIRef :return: subjects :rtype: List[URIRef] """ definition_types = self._get_subclasses_of_definition_type(definition_type) results = [] for definition_type in definition_types: instances = self.graph.subjects(RDF.type, definition_type) if include_labels: results += [ (shape, self.graph.value(shape, RDFS.label)) for shape in instances ] else: results += instances return results
[docs] def get_shapes_of_domain(self, domain: URIRef) -> List[URIRef]: """Get subjects present in shape of domain type. :param domain: desired domain :type domain: URIRef :return: subjects :rtype: List[URIRef] """ included_domains = self._get_included_domains(domain) results = [] for domain in included_domains: results += self.graph.subjects(RDF.type, domain) return results
[docs] def get_shapes_about_class( self, rdf_type: URIRef, contexts: Optional[List["ShapeCollection"]] = None ) -> List[URIRef]: """Returns a list of shapes that either target the given class (or superclasses of it), or otherwise only apply to URIs of the given type. :param rdf_type: an OWL class :type rdf_type: URIRef :param contexts: list of ShapeCollections that help determine the class structure :type contexts: List["ShapeCollection"], optional :return: a list of shapes in this ShapeCollection that concern that class :rtype: List[URIRef] """ # merge the contexts together w/ our graph if they are provided, else # just use the existing shape collection graph if contexts is not None: context = sum(map(lambda x: x.graph, contexts), rdflib.Graph()) graph = self.graph + context else: graph = self.graph rows = graph.query( f""" PREFIX sh: <http://www.w3.org/ns/shacl#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> SELECT ?shape WHERE {{ ?shape a sh:NodeShape . {rdf_type.n3()} rdfs:subClassOf* ?class . {{ ?shape sh:targetClass ?class }} UNION {{ ?shape sh:class ?class }} }}""" ) return [row[0] for row in rows] # type: ignore
[docs] def shape_to_query(self, shape: URIRef) -> str: """ This method takes a URI representing a SHACL shape as an argument and returns a SPARQL query selecting the information which would be used to satisfy that SHACL shape. This uses the following rules: - `<shape> sh:targetClass <class>` -> `?target rdf:type/rdfs:subClassOf* <class>` - `<shape> sh:property [ sh:path <path>; sh:class <class>; sh:name <name> ]` -> ?target <path> ?name . ?name rdf:type/rdfs:subClassOf* <class> - `<shape> sh:property [ sh:path <path>; sh:hasValue <value>]` -> ?target <path> <value> """ clauses, project = _shape_to_where(self.graph, shape, "?target") preamble = """PREFIX sh: <http://www.w3.org/ns/shacl#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> """ return f"{preamble} SELECT {' '.join(project)} WHERE {{\n{clauses}\n}}"
def _is_list(graph: Graph, node: Node): return (node, RDF.first, None) in graph def _target_to_sparql(graph: Graph, nodeshape: Node, root_var: str = "?target") -> str: """ Takes the nodeshape and returns the SPARQL query that would be used to find the target nodes of that nodeshape. This is a helper function for _shape_to_where Handles: - targetClass - targetSubjectsOf - targetObjectsOf - targetNode If there is more than one of these clauses on the nodeshape, they are combined with a UNION. Returns the string of the query. """ # get all the clauses for the targetClass targetClasses = graph.objects(nodeshape, SH.targetClass) tc_clauses = [ f"{root_var} rdf:type/rdfs:subClassOf* {tc.n3()} .\n" for tc in targetClasses # type: ignore ] # get all the clauses for the targetSubjectsOf targetSubjectsOf = graph.objects(nodeshape, SH.targetSubjectsOf) tso_clauses = [ f"{root_var} {tso.n3()} ?ignore .\n" for tso in targetSubjectsOf # type: ignore ] # get all the clauses for the targetObjectsOf targetObjectsOf = graph.objects(nodeshape, SH.targetObjectsOf) too_clauses = [ f"?ignore {too.n3()} {root_var} .\n" for too in targetObjectsOf # type: ignore ] # get all the clauses for the targetNode targetNode = list(graph.objects(nodeshape, SH.targetNode)) tn_clauses = [ f"BIND({tn.n3()} AS {root_var}) .\n" for tn in targetNode # type: ignore ] # combine all the clauses with a UNION all_clauses = tc_clauses + tso_clauses + too_clauses + tn_clauses return " UNION ".join(f"{{ {clause} }}" for clause in all_clauses) def _clauses_on_nodeshape( graph: Graph, nodeshape: Node, root_variable: str = "?target" ) -> str: """handles the constraint components on a node shape (other than targetClass, targetSubjectsOf, targetObjectsOf, targetNode). Builds up the SPARQL query for the given node shape, starting with the given root variable. """ clauses = [] # handle sh:class for class_constraint in graph.objects(nodeshape, SH["class"]): clauses.append( f"{root_variable} rdf:type/rdfs:subClassOf* {class_constraint.n3()} .\n" ) return " ".join(clauses) def _shape_to_where( graph: Graph, shape: URIRef, root_var: str = "?target" ) -> Tuple[str, List[str]]: # we will build the query as a string clauses: str = "" # build up the SELECT clause as a set of vars project: Set[str] = {root_var} # local state for generating unique variable names prefix = "".join(random.choice(string.ascii_lowercase) for _ in range(2)) variable_counter = 0 def gensym(): nonlocal variable_counter varname = f"{prefix}{variable_counter}" variable_counter += 1 return varname # get all the target clauses clauses += _target_to_sparql(graph, shape, root_var) clauses += _clauses_on_nodeshape(graph, shape, root_var) # find all of the non-qualified property shapes. All of these will use the same variable # for all uses of the same sh:path value pshapes_by_path: Dict[Node, List[Node]] = defaultdict(list) qualified_pshapes: Set[Node] = set() for pshape in graph.objects(shape, SH.property): path = shacl_path_to_sparql_path(graph, graph.value(pshape, SH.path)) if not graph.value(pshape, SH.qualifiedValueShape): pshapes_by_path[path].append(pshape) # type: ignore else: qualified_pshapes.add(pshape) # look at pshapes implicitly defined by sh:path for pshape in graph.subjects(predicate=SH.path): if ( pshape == shape ): # skip the input 'shape', otherwise this will infinitely recurse continue path = shacl_path_to_sparql_path(graph, graph.value(pshape, SH.path)) if not graph.value(pshape, SH.qualifiedValueShape): pshapes_by_path[path].append(pshape) # type: ignore else: qualified_pshapes.add(pshape) for dep_shape in graph.objects(shape, SH.node): dep_clause, dep_project = _shape_to_where(graph, dep_shape, root_var) clauses += dep_clause project.update(dep_project) for or_clause in graph.objects(shape, SH["or"]): items = list(graph.objects(or_clause, (RDF.rest * ZeroOrMore) / RDF.first)) # type: ignore or_parts = [] for item in items: or_body, or_project = _shape_to_where(graph, item, root_var) or_parts.append(or_body) project.update(or_project) clauses += " UNION ".join(f"{{ {or_body} }}" for or_body in or_parts) # 'pshapes_by_path' maps a path to all of the property shapes that use that path on the target # assign a unique variable for each sh:path w/o a qualified shape pshape_vars: Dict[Node, str] = {} for pshape_list in pshapes_by_path.values(): # get name if it exists, otherwise generate a new one pshape_name = graph.value(pshape_list[0], SH.name | RDFS.label) or gensym() varname = f"?{pshape_name}" for pshape in pshape_list: pshape_vars[pshape] = varname for pshape in graph.objects(shape, SH.property): # get the varname if we've already assigned one for this pshape above, # or generate a new one. When generating a name, use the SH.name field # in the PropertyShape or generate a unique one name = pshape_vars.get( pshape, f"?{graph.value(pshape, SH.name|RDFS.label) or gensym()}".replace(" ", "_"), ) path = shacl_path_to_sparql_path(graph, graph.value(pshape, SH.path)) qMinCount = graph.value(pshape, SH.qualifiedMinCount) or 0 pclass = graph.value( pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["class"]) # type: ignore ) if pclass: clause = f"{root_var} {path} {name} .\n {name} rdf:type/rdfs:subClassOf* {pclass.n3()} .\n" if qMinCount == 0: clause = f"OPTIONAL {{ {clause} }} .\n" clauses += clause project.add(name) pnode = graph.value( pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["node"]) # type: ignore ) if pnode: node_clauses, node_project = _shape_to_where(graph, pnode, root_var) clause = f"{root_var} {path} {name} .\n" clause += node_clauses.replace(root_var, name) if qMinCount == 0: clause = f"OPTIONAL {{ {clause} }}" clauses += clause project.update({p.replace(root_var, name) for p in node_project}) or_values = graph.value( pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["or"]) ) if or_values: # or clauses share the variable name. Get the variablen name from the SH.name # or RDFS.label for the current pshape, or generate a new one or_var = graph.value(pshape, SH.name | RDFS.label) or gensym() or_var = f"?{or_var}".replace(" ", "_") # connect ?target to the variable that will be used in the OR clauses clauses += f"{root_var} {path} {or_var} .\n" items = list(graph.objects(or_values, (RDF.rest * ZeroOrMore) / RDF.first)) or_parts = [] for item in items: or_body, or_project = _shape_to_where(graph, item, or_var) or_parts.append(or_body) project.update(or_project) clauses += " UNION ".join(f"{{ {or_body} }}" for or_body in or_parts) pvalue = graph.value(pshape, SH.hasValue) if pvalue: clauses += f"{root_var} {path} {pvalue.n3()} .\n" if not pclass and not pnode and not or_values and not pvalue: clauses += f"{root_var} {path} {name} .\n" return clauses, list(project) def _resolve_imports( graph: rdflib.Graph, recursive_limit: int, seen: Set[rdflib.URIRef], error_on_missing_imports: bool = True, ) -> rdflib.Graph: from buildingmotif.dataclasses.library import Library bm = get_building_motif() logger = logging.getLogger(__name__) if recursive_limit == 0: return graph new_g = copy_graph(graph) for ontology in graph.objects(predicate=OWL.imports): if ontology in seen: continue seen.add(ontology) # go find the graph definition from our libraries try: lib = Library.load(name=ontology) sc_to_add = lib.get_shape_collection() except Exception as e: logger.warning( "Could not resolve import of %s from Libraries (%s). Trying shape collections", ontology, e, ) sc_to_add = None # search through our shape collections for a graph with the provided name if sc_to_add is None: for shape_collection in bm.table_connection.get_all_db_shape_collections(): sc = ShapeCollection.load(shape_collection.id) if sc.graph_name == ontology: sc_to_add = sc break logger.warning( "Could not resolve import of %s from Libraries. Trying shape collections", ontology, ) if sc_to_add is None: if error_on_missing_imports: raise Exception("Could not resolve import of %s", ontology) continue dependency = _resolve_imports( sc_to_add.graph, recursive_limit - 1, seen, error_on_missing_imports=error_on_missing_imports, ) new_g += dependency return new_g