import logging
import secrets
from collections import defaultdict
from copy import copy
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union
import pyshacl # type: ignore
from rdflib import BNode, Graph, Literal, URIRef
from rdflib.compare import _TripleCanonicalizer
from rdflib.paths import ZeroOrOne
from rdflib.term import Node
from sqlalchemy.exc import NoResultFound
from buildingmotif.namespaces import OWL, PARAM, RDF, SH, XSD, bind_prefixes
if TYPE_CHECKING:
from buildingmotif.dataclasses import Library, Template
Triple = Tuple[Node, Node, Node]
_gensym_counter = 0
def _strip_param(param: Union[Node, str]) -> str:
"""
Strips all PARAM namespaces from the input parameter
"""
param = str(param)
while param.startswith(PARAM):
param = param[len(PARAM) :]
return param
def _gensym(prefix: str = "p") -> URIRef:
"""
Generate a unique identifier.
"""
global _gensym_counter
_gensym_counter += 1
return PARAM[f"{prefix}{_gensym_counter}"]
def _param_name(param: URIRef) -> str:
"""
Returns the name of the param without the prefix.
"""
assert param.startswith(PARAM)
return param[len(PARAM) :]
def _guarantee_unique_template_name(library: "Library", name: str) -> str:
"""
Ensure that the template name is unique in the library by appending an increasing
number. This is only called when we are generating templates from GraphDiffs and
the names are local to the Library. The Library is intended to be ephemeral so these
names will not be around for long.
"""
idx = 1
original_name = name
try:
while library.get_template_by_name(name):
name = f"{original_name}_{idx}"
idx += 1
except NoResultFound:
# this means that the template does not exist and we can use the original name
pass
return name
[docs]def copy_graph(g: Graph, preserve_blank_nodes: bool = True) -> Graph:
"""
Copy a graph. Creates new blank nodes so that these remain unique to each Graph
:param g: the graph to copy
:type g: Graph
:param preserve_blank_nodes: if true, keep blank nodes the same when copying the graph
:type preserve_blank_nodes: boolean, defaults to True
:return: a copy of the input graph
:rtype: Graph
"""
c = Graph()
new_prefix = secrets.token_hex(4)
for t in g.triples((None, None, None)):
assert isinstance(t, tuple)
(s, p, o) = t
if not preserve_blank_nodes:
if isinstance(s, BNode):
s = BNode(value=new_prefix + s.toPython())
if isinstance(p, BNode):
p = BNode(value=new_prefix + p.toPython())
if isinstance(o, BNode):
o = BNode(value=new_prefix + o.toPython())
c.add((s, p, o))
return c
[docs]def inline_sh_nodes(g: Graph):
"""
Recursively inlines all sh:node properties and objects on the graph.
:param g: graph to be edited
:type g: Graph
"""
q = """
PREFIX sh: <http://www.w3.org/ns/shacl#>
CONSTRUCT {
?parent ?p ?o .
}
WHERE {
?parent sh:node ?child .
?child ?p ?o
}"""
original_size = 0
while original_size != len(g): # type: ignore
original_size = len(g) # type: ignore
for (s, p, o) in g.query(q): # type: ignore
if p == RDF.type and o == SH.NodeShape:
continue
g.add((s, p, o))
break
[docs]def combine_graphs(*graphs: Graph) -> Graph:
"""Combine all of the graphs into a new graph.
:return: combined graph
:rtype: Graph
"""
newg = Graph()
for graph in graphs:
newg += graph
return newg
[docs]def graph_size(g: Graph) -> int:
"""Returns the number of triples in a graph.
:param g: graph to be measured
:type g: Graph
:return: number of triples in the graph
:rtype: int
"""
return len(tuple(g.triples((None, None, None))))
[docs]def remove_triples_with_node(g: Graph, node: URIRef) -> None:
"""Remove all triples that include the given node. Edits the graph
in-place.
:param g: the graph to remove triples from
:type g: Graph
:param node: the node to remove
:type node: URIRef
"""
for triple in g.triples((node, None, None)):
g.remove(triple)
for triple in g.triples((None, node, None)):
g.remove(triple)
for triple in g.triples((None, None, node)):
g.remove(triple)
[docs]def replace_nodes(g: Graph, replace: Dict[Node, Node]) -> None:
"""Replace nodes in a graph.
:param g: graph to replace nodes in
:type g: Graph
:param replace: dict mapping old nodes to new nodes
:type replace: Dict[Node, Node]
"""
for s, p, o in g.triples((None, None, None)):
g.remove((s, p, o))
if s in replace:
s = replace[s]
if p in replace:
p = replace[p]
if o in replace:
o = replace[o]
g.add((s, p, o))
[docs]def get_ontology_files(directory: Path, recursive: bool = True) -> List[Path]:
"""Returns a list of all ontology files in the given directory.
If recursive is true, traverses the directory structure to find ontology
files not just in the given directory.
:param directory: the directory to begin the search
:type directory: Path
:param recursive: if true, find ontology files in nested directories,
defaults to true
:type recursive: bool, optional
:return: a list of filenames
:rtype: List[Path]
"""
patterns = [
"*.ttl",
"*.n3",
"*.rdf",
"*.xml",
]
if recursive:
searches = (directory.rglob(f"{pat}") for pat in patterns)
else:
searches = (directory.glob(f"{pat}") for pat in patterns)
# filter out files in .ipynb_checkpoints
filtered_searches = (
filter(lambda x: ".ipynb_checkpoints" not in Path(x).parts, search)
for search in searches
)
return list(chain.from_iterable(filtered_searches))
[docs]def get_template_parts_from_shape(
shape_name: URIRef, shape_graph: Graph
) -> Tuple[Graph, List[Dict]]:
"""Turn a SHACL shape into a template. The following attributes of
NodeShapes will be incorporated into the resulting template:
- sh:property with sh:minCount
- sh:property with sh:qualifiedMinCount
- sh:class
- sh:node
:param shape_name: name of shape
:type shape_name: URIRef
:param shape_graph: shape graph
:type shape_graph: Graph
:raises Exception: if more than one object type detected on shape
:raises Exception: if more than one min count detected on shape
:return: template parts
:rtype: Tuple[Graph, List[Dict]]
"""
# TODO: sh:or?
# the template body
body = Graph()
root_param = PARAM["name"]
deps = []
pshapes = shape_graph.objects(subject=shape_name, predicate=SH["property"])
for pshape in pshapes:
property_path = shape_graph.value(pshape, SH["path"])
if property_path is None:
raise Exception(
f"no sh:path detected on {shape_name} property shape {pshape}"
)
# TODO: expand otypes to include sh:in, sh:or, or no datatype at all!
otypes = list(
shape_graph.objects(
subject=pshape,
predicate=SH["qualifiedValueShape"]
* ZeroOrOne # type:ignore
/ (SH["class"] | SH["node"] | SH["datatype"]),
)
)
mincounts = list(
shape_graph.objects(
subject=pshape, predicate=SH["minCount"] | SH["qualifiedMinCount"]
)
)
if len(otypes) > 1:
raise Exception(f"more than one object type detected on {shape_name}")
if len(mincounts) > 1:
raise Exception(f"more than one min count detected on {shape_name}")
if len(mincounts) == 0 or len(otypes) == 0:
# print(f"No useful information on {shape_name} - {pshape}")
# print(shape_graph.cbd(pshape).serialize())
continue
(path, otype, mincount) = property_path, otypes[0], mincounts[0]
assert isinstance(mincount, Literal)
param_name = shape_graph.value(pshape, SH["name"])
for num in range(int(mincount)):
if param_name is not None:
param = PARAM[f"{param_name}{num}"]
else:
param = _gensym()
body.add((root_param, path, param))
otype_as_class = (None, SH["class"], otype) in shape_graph
otype_as_node = (None, SH["node"], otype) in shape_graph
otype_is_nodeshape = (otype, RDF.type, SH.NodeShape) in shape_graph
if (otype_as_class and otype_is_nodeshape) or otype_as_node:
deps.append({"template": str(otype), "args": {"name": param}})
body.add((param, RDF.type, otype))
pvalue = shape_graph.value(pshape, SH["hasValue"])
if pvalue:
body.add((root_param, path, pvalue))
if (shape_name, RDF.type, OWL.Class) in shape_graph:
body.add((root_param, RDF.type, shape_name))
classes = shape_graph.objects(shape_name, SH["class"])
for cls in classes:
body.add((root_param, RDF.type, cls))
classes = shape_graph.objects(shape_name, SH["targetClass"])
for cls in classes:
body.add((root_param, RDF.type, cls))
# for all objects of sh:node, add them to the deps if they haven't been added
# already through the property shapes above
nodes = shape_graph.cbd(shape_name).objects(predicate=SH["node"], unique=True)
for node in nodes:
# if node is already in deps, skip it
if any(str(node) == dep["template"] for dep in deps):
continue
deps.append(
{"template": str(node), "args": {"name": "name"}}
) # tie to root param
return body, deps
@dataclass
class _TemplateIndex:
template: "Template"
param_types: Dict[Node, List[Node]]
prop_types: Dict[Node, List[Node]]
prop_values: Dict[Node, List[Node]]
prop_shapes: Dict[Node, List[Node]]
target: URIRef
@property
def target_type(self):
return self.param_types[self.target][0]
def _prep_shape_graph() -> Graph:
shape = Graph()
bind_prefixes(shape)
shape.bind("mark", PARAM)
return shape
def _index_properties(templ: "Template") -> _TemplateIndex:
templ_graph = templ.evaluate(
{p: PARAM[p] for p in templ.parameters}, {"mark": PARAM}
)
assert isinstance(templ_graph, Graph)
# pick a random node to act as the 'target' of the shape
target = next(iter(templ_graph.subjects(RDF.type)))
print(f"Choosing {target} as the target of the shape")
assert isinstance(target, URIRef)
# store the classes for each parameter
param_types: Dict[Node, List[Node]] = defaultdict(list)
for (param, ptype) in templ_graph.subject_objects(RDF.type):
param_types[param].append(ptype)
# store the properties and their types for the target
prop_types: Dict[Node, List[Node]] = defaultdict(list)
prop_values: Dict[Node, List[Node]] = defaultdict(list)
prop_shapes: Dict[Node, List[Node]] = defaultdict(list)
# TODO: prop_shapes for all properties whose object corresponds to another shape
for p, o in templ_graph.predicate_objects(target):
if p == RDF.type:
continue
# maybe_param = str(o).removeprefix(PARAM) Python >=3.9
maybe_param = str(o)[len(PARAM) :]
if maybe_param in templ.dependency_parameters:
dep = templ.dependency_for_parameter(maybe_param)
if dep is not None:
prop_shapes[p].append(URIRef(dep._name))
elif o in param_types:
prop_types[p].append(param_types[o][0])
elif str(o) not in PARAM:
prop_values[p].append(o)
elif str(o) in PARAM and o not in param_types:
logging.warn(
f"{o} is does not have a type and does not seem to be a literal"
)
return _TemplateIndex(
templ,
dict(param_types),
dict(prop_types),
dict(prop_values),
dict(prop_shapes),
target,
)
def _add_property_shape(
graph: Graph, name: Node, constraint: Node, path: Node, value: Node
):
pshape = BNode()
graph.add((name, SH.property, pshape))
graph.add((pshape, SH.path, path))
graph.add((pshape, constraint, value))
graph.add((pshape, SH["minCount"], Literal(1)))
graph.add((pshape, SH["maxCount"], Literal(1)))
def _add_qualified_property_shape(
graph: Graph, name: Node, constraint: Node, path: Node, value: Node
):
pshape = BNode()
graph.add((name, SH.property, pshape))
graph.add((pshape, SH.path, path))
qvc = BNode()
graph.add((pshape, SH["qualifiedValueShape"], qvc))
graph.add((qvc, constraint, value))
graph.add((pshape, SH["qualifiedMinCount"], Literal(1)))
graph.add((pshape, SH["qualifiedMaxCount"], Literal(1)))
[docs]def template_to_shape(template: "Template") -> Graph:
"""Turn this template into a SHACL shape.
:param template: template to convert
:type template: template
:return: graph of template
:rtype: Graph
"""
# TODO If 'use_all' is True, this will create a shape that incorporates all
# Templates by the same name in the same Library.
templ = copy(template)
shape = _prep_shape_graph()
idx = _index_properties(templ)
shape.add((PARAM[templ.name], SH.targetClass, idx.target_type))
# create the shape
shape.add((PARAM[templ.name], RDF.type, SH.NodeShape))
shape.add((PARAM[templ.name], SH["class"], idx.target_type))
for prop, ptypes in idx.prop_types.items():
if len(ptypes) == 1:
_add_property_shape(shape, PARAM[templ.name], SH["class"], prop, ptypes[0])
else: # more than one ptype
for ptype in ptypes:
_add_qualified_property_shape(
shape, PARAM[templ.name], SH["class"], prop, ptype
)
for prop, values in idx.prop_values.items():
if len(values) == 1:
_add_property_shape(shape, PARAM[templ.name], SH.hasValue, prop, values[0])
else: # more than one ptype
for value in values:
_add_qualified_property_shape(
shape, PARAM[templ.name], SH.hasValue, prop, value
)
for prop, shapes in idx.prop_shapes.items():
if len(shapes) == 1:
_add_property_shape(shape, PARAM[templ.name], SH["node"], prop, shapes[0])
else: # more than one ptype
for shp in shapes:
_add_qualified_property_shape(
# TODO: fix this?
shape,
PARAM[templ.name],
SH.node,
prop,
PARAM[str(shp)],
)
return shape
[docs]def new_temporary_graph(more_namespaces: Optional[dict] = None) -> Graph:
"""Creates a new in-memory RDF graph with common and additional namespace
bindings.
:param more_namespaces: namespaces, defaults to None
:type more_namespaces: Optional[dict], optional
:return: graph with namespaces
:rtype: Graph
"""
g = Graph()
bind_prefixes(g)
if more_namespaces:
for prefix, uri in more_namespaces.items():
g.bind(prefix, uri)
return g
[docs]def get_parameters(graph: Graph) -> Set[str]:
"""Returns the set of parameter names in the given graph.
Parameters are identified by the PARAM namespace, `urn:___param___#`. This
method returns the names of the parameters, not the full URIs. For example,
the parameter `urn:___param___#abc` in a graph would be returned as `abc`.
:param graph: a graph containing parameters
:type graph: Graph
:return: a set of the parameter names in the graph
:rtype: Set[str]
"""
# get an iterable of all nodes in the graph
all_nodes = chain.from_iterable(graph.triples((None, None, None)))
# get all nodes in the PARAM namespace
params = {str(node) for node in all_nodes if str(node).startswith(PARAM)}
# extract the 'value' part of the param, which is the name of the parameter
return {node[len(PARAM) :] for node in params}
def _inline_sh_node(sg: Graph):
"""
This detects the use of 'sh:node' on SHACL NodeShapes and inlines
the shape they point to.
"""
q = """
PREFIX sh: <http://www.w3.org/ns/shacl#>
SELECT ?parent ?child WHERE {
?parent a sh:NodeShape ;
sh:node ?child .
}"""
for row in sg.query(q):
parent, child = row # type: ignore
sg.remove((parent, SH.node, child))
pos = sg.predicate_objects(child)
for (p, o) in pos:
sg.add((parent, p, o))
def _inline_sh_and(sg: Graph):
"""
This detects the use of 'sh:and' on SHACL NodeShapes and inlines
all of the included shapes
"""
q = """
PREFIX sh: <http://www.w3.org/ns/shacl#>
SELECT ?parent ?child ?andnode WHERE {
?parent a sh:NodeShape ;
sh:and ?andnode .
?andnode rdf:rest*/rdf:first ?child .
}"""
for row in sg.query(q):
parent, child, to_remove = row # type: ignore
sg.remove((parent, SH["and"], to_remove))
pos = sg.predicate_objects(child)
for (p, o) in pos:
sg.add((parent, p, o))
[docs]def rewrite_shape_graph(g: Graph) -> Graph:
"""
Rewrites the input graph to make the resulting validation report more useful.
:param g: the shape graph to rewrite
:type g: Graph
:return: a *copy* of the original shape graph w/ rewritten shapes
:rtype: Graph
"""
sg = copy_graph(g)
previous_size = -1
while len(sg) != previous_size: # type: ignore
previous_size = len(sg) # type: ignore
_inline_sh_and(sg)
# make sure to handle sh:node *after* sh:and
_inline_sh_node(sg)
return sg
[docs]def skip_uri(uri: URIRef) -> bool:
"""
Returns True if the URI should be skipped during processing
because it is axiomatic or not expected to be imported.
Skips URIs in the XSD and SHACL namespaces.
:return: True if the URI should be skipped; False otherwise
:rtype: bool
"""
# Now that we have all the templates, we can populate the dependencies.
# IGNORES missing XSD imports --- there is really no reason to import the XSD
# ontology because the handling is baked into the software processing the RDF
# graph. Thus, XSD URIs will always yield import warnings. This is noisy, so we
# suppress them.
skip_ns = [XSD, SH]
for ns in skip_ns:
if uri.startswith(ns):
return True
return False
[docs]def shacl_validate(
data_graph: Graph,
shape_graph: Optional[Graph] = None,
engine: Optional[str] = "topquadrant",
) -> Tuple[bool, Graph, str]:
"""
Validate the data graph against the shape graph.
Uses the fastest validation method available. Use the 'topquadrant' feature
to use TopQuadrant's SHACL engine. Defaults to using PySHACL.
:param data_graph: the graph to validate
:type data_graph: Graph
:param shape_graph: the shape graph to validate against
:type shape_graph: Graph, optional
:param engine: the SHACL engine to use, defaults to "topquadrant"
:type engine: str, optional
:return: a tuple containing the validation result, the validation report, and the validation report string
:rtype: Tuple[bool, Graph, str]
"""
if engine == "topquadrant":
try:
from brick_tq_shacl.topquadrant_shacl import (
validate as tq_validate, # type: ignore
)
return tq_validate(data_graph, shape_graph or Graph()) # type: ignore
except ImportError:
logging.info(
"TopQuadrant SHACL engine not available. Using PySHACL instead."
)
pass
data_graph = data_graph + (shape_graph or Graph())
return pyshacl.validate(
data_graph,
shacl_graph=shape_graph,
ont_graph=shape_graph,
advanced=True,
js=True,
allow_warnings=True,
) # type: ignore
[docs]def shacl_inference(
data_graph: Graph,
shape_graph: Optional[Graph] = None,
engine: Optional[str] = "topquadrant",
) -> Graph:
"""
Infer new triples in the data graph using the shape graph.
Edits the data graph in place. Uses the fastest inference method available.
Use the 'topquadrant' feature to use TopQuadrant's SHACL engine. Defaults to
using PySHACL.
:param data_graph: the graph to infer new triples in
:type data_graph: Graph
:param shape_graph: the shape graph to use for inference
:type shape_graph: Optional[Graph]
:param engine: the SHACL engine to use, defaults to "topquadrant"
:type engine: str, optional
:return: the data graph with inferred triples
:rtype: Graph
"""
if engine == "topquadrant":
try:
from brick_tq_shacl.topquadrant_shacl import infer as tq_infer
return tq_infer(data_graph, shape_graph or Graph()) # type: ignore
except ImportError:
logging.info(
"TopQuadrant SHACL engine not available. Using PySHACL instead."
)
pass
# We use a fixed-point computation approach to 'compiling' RDF models.
# We accomlish this by keeping track of the size of the graph before and after
# the inference step. If the size of the graph changes, then we know that the
# inference has had some effect. We do this at most 3 times to avoid looping
# forever.
pre_compile_length = len(data_graph) # type: ignore
pyshacl.validate(
data_graph=data_graph,
shacl_graph=shape_graph,
ont_graph=shape_graph,
advanced=True,
inplace=True,
js=True,
allow_warnings=True,
)
post_compile_length = len(data_graph) # type: ignore
attempts = 3
while attempts > 0 and post_compile_length != pre_compile_length:
pre_compile_length = len(data_graph) # type: ignore
pyshacl.validate(
data_graph=data_graph,
shacl_graph=shape_graph,
ont_graph=shape_graph,
advanced=True,
inplace=True,
js=True,
allow_warnings=True,
)
post_compile_length = len(data_graph) # type: ignore
attempts -= 1
return data_graph - (shape_graph or Graph())
[docs]def skolemize_shapes(g: Graph) -> Graph:
"""
Skolemize the shapes in the graph.
:param g: the graph to skolemize
:type g: Graph
:return: the skolemized graph
:rtype: Graph
"""
# write a query to update agraph by changing all PropertyShape blank nodes
# to URIRefs
g = copy_graph(g)
property_shapes = list(g.subjects(predicate=RDF.type, object=SH.PropertyShape))
property_shapes.extend(list(g.objects(predicate=SH.property)))
replacements = {}
for ps in property_shapes:
# if not bnode, skip
if not isinstance(ps, BNode):
continue
# create a new URIRef
new_ps = URIRef(f"urn:well-known/{secrets.token_hex(4)}")
# replace the old BNode with the new URIRef
replacements[ps] = new_ps
# apply the replacements
replace_nodes(g, replacements)
# name all objects of qualifiedValueShape
qvs = list(g.objects(predicate=SH.qualifiedValueShape))
replacements = {}
for qv in qvs:
# if not bnode, skip
if not isinstance(qv, BNode):
continue
# create a new URIRef
new_qv = URIRef(f"urn:well-known/{secrets.token_hex(4)}")
# replace the old BNode with the new URIRef
replacements[qv] = new_qv
# apply the replacements
replace_nodes(g, replacements)
return g
[docs]def graph_hash(graph: Graph) -> int:
"""
Returns a cryptographic hash of the graph contents.
This uses the same method as rdflib's isomorphic function to generate a cryptographic hash of a given graph.
This method calculates a consistent hash of the canonicalized form of the graph.
If the hashes of two graphs are equal, this means that the graphs are isomorphic.
Generating the hashes (using this method) and caching them allows graph isomorphism to be determined
without having to recalculate the canonical form of the graph, which can be expensive.
:param graph: graph to hash
:type graph: graph
:return: integer hash
:rtype: int
"""
# Copy graph to memory (improved performance if graph is backed by a DB store)
graph_prime = copy_graph(graph)
triple_canonicalizer = _TripleCanonicalizer(graph_prime)
return triple_canonicalizer.to_hash()