Source code for buildingmotif.ingresses.brick
from rdflib import Graph, Literal, Namespace
from buildingmotif import BuildingMOTIF
from buildingmotif.dataclasses import Library
from buildingmotif.ingresses.bacnet import BACnetNetwork
from buildingmotif.ingresses.base import GraphIngressHandler
def _clean_uri(n) -> str:
if isinstance(n, str):
return n.replace(" ", "_")
return str(n)
[docs]class BACnetToBrickIngress(GraphIngressHandler):
"""Turns a BACnetNetwork RecordIngressHandler into a Brick model"""
BNS = Namespace("urn:brick_bacnet_scan/")
def __init__(self, bm: BuildingMOTIF, upstream: BACnetNetwork):
"""Create a new ignress handler for turning a BACnet network scrape
into a Brick model
:param bm: BuildingMOTIF instance
:type bm: BuildingMOTIF
:param upstream: the BACnetNetwork ingress handler to ingest from
:type upstream: BACnetNetwork
"""
super().__init__(bm)
self.upstream = upstream
self.bacnet_lib = Library.load(directory="bacnet")
self.device_template = self.bacnet_lib.get_template_by_name("brick-device")
self.object_template = self.bacnet_lib.get_template_by_name("brick-point")
[docs] def graph(self, ns: Namespace) -> Graph:
"""Generates a Brick graph from the BACnet network with all entities
placed in the given namespace.
:param ns: Namespace for all inferred entities
:type ns: Namespace
:return: RDF graph containing a Brick model of the BACnet network
:rtype: Graph
"""
g = Graph()
# ensure 'ns' is a Namespace or URI forming won't work
if not isinstance(ns, Namespace):
ns = Namespace(ns)
records = self.upstream.records
assert records is not None
for record in records:
if record.rtype == "Device":
dev = record.fields
device_id = dev["device_id"]
name = _clean_uri(device_id) or _clean_uri(dev["address"])
dev_graph = self.device_template.evaluate(
{
"name": ns[name],
"instance-number": Literal(device_id),
"address": Literal(dev["address"]),
}
)
assert isinstance(dev_graph, Graph)
g += dev_graph
elif record.rtype == "Object":
point = record.fields
device_id = point["device_id"]
obj_graph = self.object_template.evaluate(
{
"name": ns[f"{_clean_uri(point['name'])}-{point['address']}"],
"identifier": Literal(f"{point['type']},{point['address']}"),
"obj-name": Literal(point["name"]),
"device": ns[_clean_uri(device_id)],
}
)
assert isinstance(obj_graph, Graph)
g += obj_graph
return g