Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
from gridappsd import GridAPPSD
from gridappsd import topics

from cimgraph.databases import GridappsdConnection, BlazegraphConnection
from cimgraph.models import BusBranchModel, FeederModel

import os
import cimgraph.utils as utils
import cimgraph.data_profile.cimhub_ufls as cim

REQUEST_FIELD = ".".join((topics.PROCESS_PREFIX, "request.field"))

class FieldListener:
Expand All @@ -29,13 +36,11 @@ def on_message(self, headers, message):
request_type = request_data.get("request_type")
if request_type == "get_context":
response = self.ot_connection.get_response(headers["destination"],message)
self.proxy_connection.send(headers["reply_to"],response)
self.proxy_connection.send(headers["reply-to"],response)
elif request_type == "start_publishing":
response = self.ot_connection.get_response(headers["destination"],message)
self.proxy_connection.send(headers["reply-to"],json.dumps(response))

elif 'goss.gridappsd.process.request' in headers["destination"]:
response = self.ot_connection.get_response(headers["destination"],message)
#print(response)
self.proxy_connection.send(headers["reply-to"],json.dumps(response))

else:
print(f"Unrecognized message received by Proxy: {message}")

Expand All @@ -48,7 +53,7 @@ class FieldProxyForwarder:
when direct connection is not possible.
"""

def __init__(self, connection_url: str, username: str, password: str):
def __init__(self, connection_url: str, username: str, password: str, mrid :str):

#Connect to OT
self.ot_connection = GridAPPSD()
Expand All @@ -67,19 +72,50 @@ def __init__(self, connection_url: str, username: str, password: str):

#Subscribe to messages from field
self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC+'.*', id=1, ack="auto")
self.proxy_connection.subscribe(destination='goss.gridappsd.process.request.data.powergridmodel', id=2, ack="auto")

self.proxy_connection.subscribe(destination='goss.gridappsd.process.request.*', id=2, ack="auto")
#Subscribe to messages on OT bus
self.ot_connection.subscribe(topics.field_input_topic(), self.on_message_from_ot)



os.environ['CIMG_CIM_PROFILE'] = 'cimhub_ufls'
os.environ['CIMG_URL'] = 'http://localhost:8889/bigdata/namespace/kb/sparql'
os.environ['CIMG_DATABASE'] = 'powergridmodel'
os.environ['CIMG_NAMESPACE'] = 'http://iec.ch/TC57/CIM100#'
os.environ['CIMG_IEC61970_301'] = '8'
os.environ['CIMG_USE_UNITS'] = 'False'

self.database = BlazegraphConnection()
distribution_area = cim.DistributionArea(mRID=mrid)
self.network = BusBranchModel(
connection=self.database,
container=distribution_area,
distributed=False)
self.network.get_all_edges(cim.DistributionArea)
self.network.get_all_edges(cim.Substation)

for substation in self.network.graph.get(cim.Substation,{}).values():
print(f'Subscribing to Substation: /topic/goss.gridappsd.field.{substation.mRID}')
self.ot_connection.subscribe('/topic/goss.gridappsd.field.'+substation.mRID, self.on_message_from_ot)



#self.ot_connection.subscribe(topics.BASE_FIELD_TOPIC, self.on_message_from_ot)


def on_message_from_ot(self, headers, message):

"Receives messages coming from OT bus (GridAPPS-D) and forwards to Proxy bus"
try:
print(f"Received message from OT: {message}")

if headers["destination"] == topics.field_input_topic():
self.proxy_connection.send(topics.field_input_topic(),json.dumps(message))

if 'goss.gridappsd.field' in headers["destination"]:

self.proxy_connection.send(headers["destination"],json.dumps(message))
else:
print(f"Unrecognized message received by OT: {message}")

Expand All @@ -93,12 +129,14 @@ def on_message_from_ot(self, headers, message):
parser.add_argument("username")
parser.add_argument("passwd")
parser.add_argument("connection_url")
parser.add_argument("mrid")
opts = parser.parse_args()
proxy_connection_url = opts.connection_url
proxy_username = opts.username
proxy_password = opts.passwd
mrid = opts.mrid

proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password)
proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password, mrid)

while True:
time.sleep(0.1)
13 changes: 12 additions & 1 deletion gridappsd-python-lib/gridappsd/goss.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(self,
if not self.__user__ or not self.__pass__:
raise ValueError("Invalid username/password specified.")

self._heartbeat = int(os.environ.get(GRIDAPPSD_ENV_ENUM.GRIDAPPSD_HEARTBEAT.value, 4000))
self._heartbeat = int(os.environ.get(GRIDAPPSD_ENV_ENUM.GRIDAPPSD_HEARTBEAT.value, 10000))
self._conn = None
self._ids = set()
self._topic_set = set()
Expand Down Expand Up @@ -420,3 +420,14 @@ def on_error(self, header, message):
_log.error("Error in callback router")
_log.error(header)
_log.error(message)

def on_error(self, header, message):
_log.error("Error in callback router")
_log.error(header)
_log.error(message)

def on_heartbeat_timeout(self):
_log.error("Heartbeat timeout")

def on_disconnected(self):
_log.info("Disconnected")
Loading