diff --git a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/field_proxy_forwarder.py b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/field_proxy_forwarder.py index 004efd3..e62a0ac 100644 --- a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/field_proxy_forwarder.py +++ b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/field_proxy_forwarder.py @@ -5,6 +5,13 @@ from gridappsd import GridAPPSD from gridappsd import topics +from cimgraph.databases import GridappsdConnection, BlazegraphConnection +from cimgraph.models import BusBranchModel, FeederModel + +import os +import cimgraph.utils as utils +import cimgraph.data_profile.cimhub_ufls as cim + REQUEST_FIELD = ".".join((topics.PROCESS_PREFIX, "request.field")) class FieldListener: @@ -29,13 +36,11 @@ def on_message(self, headers, message): request_type = request_data.get("request_type") if request_type == "get_context": response = self.ot_connection.get_response(headers["destination"],message) - self.proxy_connection.send(headers["reply_to"],response) + self.proxy_connection.send(headers["reply-to"],response) + elif request_type == "start_publishing": + response = self.ot_connection.get_response(headers["destination"],message) + self.proxy_connection.send(headers["reply-to"],json.dumps(response)) - elif 'goss.gridappsd.process.request' in headers["destination"]: - response = self.ot_connection.get_response(headers["destination"],message) - #print(response) - self.proxy_connection.send(headers["reply-to"],json.dumps(response)) - else: print(f"Unrecognized message received by Proxy: {message}") @@ -48,7 +53,7 @@ class FieldProxyForwarder: when direct connection is not possible. """ - def __init__(self, connection_url: str, username: str, password: str): + def __init__(self, connection_url: str, username: str, password: str, mrid :str): #Connect to OT self.ot_connection = GridAPPSD() @@ -67,12 +72,40 @@ def __init__(self, connection_url: str, username: str, password: str): #Subscribe to messages from field self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC+'.*', id=1, ack="auto") - self.proxy_connection.subscribe(destination='goss.gridappsd.process.request.data.powergridmodel', id=2, ack="auto") - + self.proxy_connection.subscribe(destination='goss.gridappsd.process.request.*', id=2, ack="auto") + #Subscribe to messages on OT bus self.ot_connection.subscribe(topics.field_input_topic(), self.on_message_from_ot) + + + os.environ['CIMG_CIM_PROFILE'] = 'cimhub_ufls' + os.environ['CIMG_URL'] = 'http://localhost:8889/bigdata/namespace/kb/sparql' + os.environ['CIMG_DATABASE'] = 'powergridmodel' + os.environ['CIMG_NAMESPACE'] = 'http://iec.ch/TC57/CIM100#' + os.environ['CIMG_IEC61970_301'] = '8' + os.environ['CIMG_USE_UNITS'] = 'False' + + self.database = BlazegraphConnection() + distribution_area = cim.DistributionArea(mRID=mrid) + self.network = BusBranchModel( + connection=self.database, + container=distribution_area, + distributed=False) + self.network.get_all_edges(cim.DistributionArea) + self.network.get_all_edges(cim.Substation) + + for substation in self.network.graph.get(cim.Substation,{}).values(): + print(f'Subscribing to Substation: /topic/goss.gridappsd.field.{substation.mRID}') + self.ot_connection.subscribe('/topic/goss.gridappsd.field.'+substation.mRID, self.on_message_from_ot) + + + + #self.ot_connection.subscribe(topics.BASE_FIELD_TOPIC, self.on_message_from_ot) + + def on_message_from_ot(self, headers, message): + "Receives messages coming from OT bus (GridAPPS-D) and forwards to Proxy bus" try: print(f"Received message from OT: {message}") @@ -80,6 +113,9 @@ def on_message_from_ot(self, headers, message): if headers["destination"] == topics.field_input_topic(): self.proxy_connection.send(topics.field_input_topic(),json.dumps(message)) + if 'goss.gridappsd.field' in headers["destination"]: + + self.proxy_connection.send(headers["destination"],json.dumps(message)) else: print(f"Unrecognized message received by OT: {message}") @@ -93,12 +129,14 @@ def on_message_from_ot(self, headers, message): parser.add_argument("username") parser.add_argument("passwd") parser.add_argument("connection_url") + parser.add_argument("mrid") opts = parser.parse_args() proxy_connection_url = opts.connection_url proxy_username = opts.username proxy_password = opts.passwd + mrid = opts.mrid - proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password) + proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password, mrid) while True: time.sleep(0.1) diff --git a/gridappsd-python-lib/gridappsd/goss.py b/gridappsd-python-lib/gridappsd/goss.py index 12d49e2..869f24c 100644 --- a/gridappsd-python-lib/gridappsd/goss.py +++ b/gridappsd-python-lib/gridappsd/goss.py @@ -121,7 +121,7 @@ def __init__(self, if not self.__user__ or not self.__pass__: raise ValueError("Invalid username/password specified.") - self._heartbeat = int(os.environ.get(GRIDAPPSD_ENV_ENUM.GRIDAPPSD_HEARTBEAT.value, 4000)) + self._heartbeat = int(os.environ.get(GRIDAPPSD_ENV_ENUM.GRIDAPPSD_HEARTBEAT.value, 10000)) self._conn = None self._ids = set() self._topic_set = set() @@ -420,3 +420,14 @@ def on_error(self, header, message): _log.error("Error in callback router") _log.error(header) _log.error(message) + + def on_error(self, header, message): + _log.error("Error in callback router") + _log.error(header) + _log.error(message) + + def on_heartbeat_timeout(self): + _log.error("Heartbeat timeout") + + def on_disconnected(self): + _log.info("Disconnected")