import logging
import random
import string
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union
import rdflib
from rdflib import RDF, RDFS, Graph, URIRef
from rdflib.paths import ZeroOrMore, ZeroOrOne
from rdflib.term import Node
from buildingmotif import get_building_motif
from buildingmotif.namespaces import BMOTIF, OWL, SH
from buildingmotif.utils import Triple, copy_graph
if TYPE_CHECKING:
from buildingmotif import BuildingMOTIF
ONTOLOGY_FILE = (
Path(__file__).resolve().parents[1] / "resources" / "building_motif_ontology.ttl"
)
ontology = rdflib.Graph().parse(ONTOLOGY_FILE)
[docs]@dataclass
class ShapeCollection:
"""This class mirrors :py:class:`database.tables.DBShapeCollection`."""
_id: int
graph: rdflib.Graph
_bm: "BuildingMOTIF"
[docs] @classmethod
def create(cls) -> "ShapeCollection":
"""Create a new ShapeCollection.
:return: new ShapeCollection
:rtype: ShapeCollection
"""
bm = get_building_motif()
db_shape_collection = bm.table_connection.create_db_shape_collection()
graph = bm.graph_connection.create_graph(
db_shape_collection.graph_id, rdflib.Graph()
)
return cls(_id=db_shape_collection.id, graph=graph, _bm=bm)
[docs] @classmethod
def load(cls, id: int) -> "ShapeCollection":
"""Get ShapeCollection from database by id.
:param id: ShapeCollection id
:type id: int
:return: ShapeCollection
:rtype: ShapeCollection
"""
bm = get_building_motif()
db_shape_collection = bm.table_connection.get_db_shape_collection(id)
graph = bm.graph_connection.get_graph(db_shape_collection.graph_id)
return cls(_id=db_shape_collection.id, graph=graph, _bm=bm)
@property
def id(self) -> Optional[int]:
return self._id
@id.setter
def id(self, new_id):
raise AttributeError("Cannot modify db id")
@property
def graph_name(self) -> Optional[URIRef]:
"""
Returns the name of the graph (subject of "a owl:Ontology")
if one exists
"""
# will be None if this is not found
return self.graph.value(predicate=RDF.type, object=OWL.Ontology) # type: ignore
[docs] def add_triples(self, *triples: Triple) -> None:
"""Add the given triples to the graph.
:param triples: a sequence of triples to add to the graph
:type triples: Triple
"""
for triple in triples:
self.graph.add(triple)
[docs] def add_graph(self, graph: rdflib.Graph) -> None:
"""Add the given graph to the ShapeCollection.
:param graph: the graph to add to the ShapeCollection
:type graph: rdflib.Graph
"""
self.graph += graph
def _cbd(self, shape_name, self_contained=True):
"""Retrieves the Concise Bounded Description (CBD) of the shape."""
cbd = self.graph.cbd(shape_name)
# if computing self-contained, do the fixed-point computation produced by unioning
# the CBDs of all nodes in the current CBD until the graph does not change
changed = True
while self_contained and changed:
new_g = rdflib.Graph()
for node in cbd.all_nodes():
new_g += self.graph.cbd(node)
new_cbd = new_g + cbd
changed = len(new_cbd) > cbd
cbd = new_cbd
return cbd
[docs] def resolve_imports(
self, recursive_limit: int = -1, error_on_missing_imports: bool = True
) -> "ShapeCollection":
"""Resolves `owl:imports` to as many levels as requested.
By default, all `owl:imports` are recursively resolved. This limit can
be changed to 0 to suppress resolving imports, or to 1..n to handle
recursion up to that limit.
:param recursive_limit: how many levels of `owl:imports` to resolve,
defaults to -1 (all)
:type recursive_limit: int, optional
:param error_on_missing_imports: if True, raises an error if any of the dependency
ontologies are missing (i.e. they need to be loaded into BuildingMOTIF), defaults
to True
:type error_on_missing_imports: bool, optional
:return: a new ShapeCollection with the types resolved
:rtype: ShapeCollection
"""
resolved_namespaces: Set[rdflib.URIRef] = set()
resolved = _resolve_imports(
self.graph,
recursive_limit,
resolved_namespaces,
error_on_missing_imports=error_on_missing_imports,
)
new_sc = ShapeCollection.create()
new_sc.add_graph(resolved)
return new_sc
@classmethod
def _get_subclasses_of_definition_type(
cls, definition_type: URIRef
) -> List[URIRef]:
"""Get all the definition types in the ontology that are subclasses in
the given definition types.
:param definition_type: the given definition type
:type definition_type: URIRef
:return: list of included definition types
:rtype: List[URIRef]
"""
children = ontology.subjects(RDFS.subClassOf, definition_type)
results = [definition_type]
for child in children:
results += cls._get_subclasses_of_definition_type(child)
return results
@classmethod
def _get_included_domains(cls, domain: URIRef) -> List[URIRef]:
"""Get all the domains in the ontology that are included in the given
domains.
:param domain: the given domain
:type domain: URIRef
:return: list of included domains
:rtype: List[URIRef]
"""
children = ontology.subjects(BMOTIF.includes, domain)
results = [domain]
for child in children:
results += cls._get_included_domains(child)
return results
[docs] def get_shapes_of_definition_type(
self, definition_type: URIRef, include_labels=False
) -> Union[List[URIRef], List[Tuple[URIRef, str]]]:
"""Get subjects present in shape of the definition type.
:param definition_type: desired definition type
:type definition_type: URIRef
:return: subjects
:rtype: List[URIRef]
"""
definition_types = self._get_subclasses_of_definition_type(definition_type)
results = []
for definition_type in definition_types:
instances = self.graph.subjects(RDF.type, definition_type)
if include_labels:
results += [
(shape, self.graph.value(shape, RDFS.label)) for shape in instances
]
else:
results += instances
return results
[docs] def get_shapes_of_domain(self, domain: URIRef) -> List[URIRef]:
"""Get subjects present in shape of domain type.
:param domain: desired domain
:type domain: URIRef
:return: subjects
:rtype: List[URIRef]
"""
included_domains = self._get_included_domains(domain)
results = []
for domain in included_domains:
results += self.graph.subjects(RDF.type, domain)
return results
[docs] def get_shapes_about_class(
self, rdf_type: URIRef, contexts: Optional[List["ShapeCollection"]] = None
) -> List[URIRef]:
"""Returns a list of shapes that either target the given class (or
superclasses of it), or otherwise only apply to URIs of the given type.
:param rdf_type: an OWL class
:type rdf_type: URIRef
:param contexts: list of ShapeCollections that help determine the class
structure
:type contexts: List["ShapeCollection"], optional
:return: a list of shapes in this ShapeCollection that concern that
class
:rtype: List[URIRef]
"""
# merge the contexts together w/ our graph if they are provided, else
# just use the existing shape collection graph
if contexts is not None:
context = sum(map(lambda x: x.graph, contexts), rdflib.Graph())
graph = self.graph + context
else:
graph = self.graph
rows = graph.query(
f"""
PREFIX sh: <http://www.w3.org/ns/shacl#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT ?shape WHERE {{
?shape a sh:NodeShape .
{rdf_type.n3()} rdfs:subClassOf* ?class .
{{ ?shape sh:targetClass ?class }}
UNION
{{ ?shape sh:class ?class }}
}}"""
)
return [row[0] for row in rows] # type: ignore
[docs] def shape_to_query(self, shape: URIRef) -> str:
"""
This method takes a URI representing a SHACL shape as an argument and returns
a SPARQL query selecting the information which would be used to satisfy that
SHACL shape. This uses the following rules:
- `<shape> sh:targetClass <class>` -> `?target rdf:type/rdfs:subClassOf* <class>`
- `<shape> sh:property [ sh:path <path>; sh:class <class>; sh:name <name> ]` ->
?target <path> ?name . ?name rdf:type/rdfs:subClassOf* <class>
- `<shape> sh:property [ sh:path <path>; sh:hasValue <value>]` ->
?target <path> <value>
"""
clauses, project = _shape_to_where(self.graph, shape)
preamble = """PREFIX sh: <http://www.w3.org/ns/shacl#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
"""
return f"{preamble} SELECT {' '.join(project)} WHERE {{\n{clauses}\n}}"
def _is_list(graph: Graph, node: Node):
return (node, RDF.first, None) in graph
def _sh_path_to_path(graph: Graph, sh_path_value: Node):
# check if sh:path points to a list
if _is_list(graph, sh_path_value):
components = list(
graph.objects(sh_path_value, (RDF.rest * ZeroOrMore) / RDF.first) # type: ignore
)
return "/".join([_sh_path_to_path(graph, comp) for comp in components])
part = graph.value(sh_path_value, SH.oneOrMorePath)
if part is not None:
return f"{_sh_path_to_path(graph, part)}+"
part = graph.value(sh_path_value, SH.zeroOrMorePath)
if part is not None:
return f"{_sh_path_to_path(graph, part)}*"
part = graph.value(sh_path_value, SH.zeroOrOnePath)
if part is not None:
return f"{_sh_path_to_path(graph, part)}?"
return sh_path_value.n3()
def _shape_to_where(graph: Graph, shape: URIRef) -> Tuple[str, List[str]]:
# we will build the query as a string
clauses: str = ""
# build up the SELECT clause as a set of vars
project: Set[str] = {"?target"}
# local state for generating unique variable names
prefix = "".join(random.choice(string.ascii_lowercase) for _ in range(2))
variable_counter = 0
def gensym():
nonlocal variable_counter
varname = f"{prefix}{variable_counter}"
variable_counter += 1
return varname
# `<shape> sh:targetClass <class>` -> `?target rdf:type/rdfs:subClassOf* <class>`
targetClasses = graph.objects(shape, SH.targetClass | SH["class"])
tc_clauses = [
f"?target rdf:type/rdfs:subClassOf* {tc.n3()} .\n" for tc in targetClasses # type: ignore
]
clauses += " UNION ".join(tc_clauses)
# handle targetSubjectsOf
targetSubjectsOf = graph.objects(shape, SH.targetSubjectsOf)
tso_clauses = [
f"?target {tso.n3()} ?ignore .\n" for tso in targetSubjectsOf # type: ignore
]
clauses += " UNION ".join(tso_clauses)
# handle targetObjectsOf
targetObjectsOf = graph.objects(shape, SH.targetObjectsOf)
too_clauses = [
f"?ignore {too.n3()} ?target .\n" for too in targetObjectsOf # type: ignore
]
clauses += " UNION ".join(too_clauses)
# handle targetNode
targetNode = list(graph.objects(shape, SH.targetNode))
if len(targetNode) == 1:
clauses += f"BIND({targetNode[0].n3()} AS ?target) .\n"
elif len(targetNode) > 1:
raise ValueError(
"More than one targetNode found. This is not currently supported"
)
# find all of the non-qualified property shapes. All of these will use the same variable
# for all uses of the same sh:path value
pshapes_by_path: Dict[Node, List[Node]] = defaultdict(list)
for pshape in graph.objects(shape, SH.property):
path = _sh_path_to_path(graph, graph.value(pshape, SH.path))
if not graph.value(pshape, SH.qualifiedValueShape):
pshapes_by_path[path].append(pshape) # type: ignore
for dep_shape in graph.objects(shape, SH.node):
dep_clause, dep_project = _shape_to_where(graph, dep_shape)
clauses += dep_clause
project.update(dep_project)
for or_clause in graph.objects(shape, SH["or"]):
items = list(graph.objects(or_clause, (RDF.rest * ZeroOrMore) / RDF.first)) # type: ignore
or_parts = []
for item in items:
or_body, or_project = _shape_to_where(graph, item)
or_parts.append(or_body)
project.update(or_project)
clauses += " UNION ".join(f"{{ {or_body} }}" for or_body in or_parts)
# assign a unique variable for each sh:path w/o a qualified shape
pshape_vars: Dict[Node, str] = {}
for pshape_list in pshapes_by_path.values():
varname = f"?{gensym()}"
for pshape in pshape_list:
pshape_vars[pshape] = varname
for pshape in graph.objects(shape, SH.property):
# get the varname if we've already assigned one for this pshape above,
# or generate a new one. When generating a name, use the SH.name field
# in the PropertyShape or generate a unique one
name = pshape_vars.get(
pshape, f"?{graph.value(pshape, SH.name) or gensym()}".replace(" ", "_")
)
path = _sh_path_to_path(graph, graph.value(pshape, SH.path))
qMinCount = graph.value(pshape, SH.qualifiedMinCount) or 0
pclass = graph.value(
pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["class"]) # type: ignore
)
if pclass:
clause = f"?target {path} {name} .\n {name} rdf:type/rdfs:subClassOf* {pclass.n3()} .\n"
if qMinCount == 0:
clause = f"OPTIONAL {{ {clause} }} .\n"
clauses += clause
project.add(name)
pnode = graph.value(
pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["node"]) # type: ignore
)
if pnode:
node_clauses, node_project = _shape_to_where(graph, pnode)
clause = f"?target {path} {name} .\n"
clause += node_clauses.replace("?target", name)
if qMinCount == 0:
clause = f"OPTIONAL {{ {clause} }}"
clauses += clause
project.update({p.replace("?target", name) for p in node_project})
or_values = graph.value(
pshape, (SH["qualifiedValueShape"] * ZeroOrOne / SH["or"])
)
if or_values:
items = list(graph.objects(or_values, (RDF.rest * ZeroOrMore) / RDF.first))
or_parts = []
for item in items:
or_body, or_project = _shape_to_where(graph, item)
or_parts.append(or_body)
project.update(or_project)
clauses += " UNION ".join(f"{{ {or_body} }}" for or_body in or_parts)
pvalue = graph.value(pshape, SH.hasValue)
if pvalue:
clauses += f"?target {path} {pvalue.n3()} .\n"
return clauses, list(project)
def _resolve_imports(
graph: rdflib.Graph,
recursive_limit: int,
seen: Set[rdflib.URIRef],
error_on_missing_imports: bool = True,
) -> rdflib.Graph:
from buildingmotif.dataclasses.library import Library
bm = get_building_motif()
logger = logging.getLogger(__name__)
if recursive_limit == 0:
return graph
new_g = copy_graph(graph)
for ontology in graph.objects(predicate=OWL.imports):
if ontology in seen:
continue
seen.add(ontology)
# go find the graph definition from our libraries
try:
lib = Library.load(name=ontology)
sc_to_add = lib.get_shape_collection()
except Exception as e:
logger.warning(
"Could not resolve import of %s from Libraries (%s). Trying shape collections",
ontology,
e,
)
sc_to_add = None
# search through our shape collections for a graph with the provided name
if sc_to_add is None:
for shape_collection in bm.table_connection.get_all_db_shape_collections():
sc = ShapeCollection.load(shape_collection.id)
if sc.graph_name == ontology:
sc_to_add = sc
break
logger.warning(
"Could not resolve import of %s from Libraries. Trying shape collections",
ontology,
)
if sc_to_add is None:
if error_on_missing_imports:
raise Exception("Could not resolve import of %s", ontology)
continue
dependency = _resolve_imports(
sc_to_add.graph,
recursive_limit - 1,
seen,
error_on_missing_imports=error_on_missing_imports,
)
new_g += dependency
return new_g