Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/flexmeasures_client/s2/cem.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,9 @@ def register_control_type(self, control_type_handler: ControlTypeHandler):

async def handle_message(self, message: Dict | pydantic.BaseModel | str):
"""
This method handles the incoming messages to the CEM
and routes them to their custom handler. If certain
control type is active and there's a handler defined in both
the control type handler as well as in the CEM, it prevails the
on of the the control type.
This method handles the incoming messages to the CEM and routes them to their custom handler.
If a certain control type is active and there's a handler defined in both
the control type handler and in the CEM, then the one defined in the control type prevails.
"""

response = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
self._timezone = pytz.timezone(timezone)

# delay the start of the schedule from the time `valid_from`
# of the FRBC.SystemDescritption.
# of the FRBC.SystemDescription.
self._valid_from_shift = valid_from_shift

def now(self):
Expand Down
42 changes: 29 additions & 13 deletions src/flexmeasures_client/s2/script/websockets_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import json
import logging
import os

import aiohttp
from aiohttp import web
Expand All @@ -9,6 +11,13 @@
from flexmeasures_client.s2.cem import CEM
from flexmeasures_client.s2.control_types.FRBC.frbc_simple import FRBCSimple

log_level = os.getenv("LOGGING_LEVEL", "WARNING").upper()
logging.basicConfig(
level=log_level,
format="[CEM][%(asctime)s] %(levelname)s: %(name)s | %(message)s",
)
LOGGER = logging.getLogger(__name__)


async def rm_details_watchdog(ws, cem: CEM):
"""This function will define a service in Home Assistant, or could
Expand All @@ -26,43 +35,46 @@ async def rm_details_watchdog(ws, cem: CEM):

# check/wait that the control type is set properly
while cem._control_type != ControlType.FILL_RATE_BASED_CONTROL:
print("waiting for the activation of the control type...")
cem._logger.debug("waiting for the activation of the control type...")
await asyncio.sleep(1)

print("CONTROL TYPE: ", cem._control_type)
cem._logger.debug(f"CONTROL TYPE: {cem._control_type}")

# after this, schedule will be triggered on reception of a new system description


async def websocket_producer(ws, cem: CEM):
print("start websocket message producer")
print("IS CLOSED? ", cem.is_closed())
cem._logger.debug("start websocket message producer")
cem._logger.debug(f"IS CLOSED? {cem.is_closed()}")
while not cem.is_closed():
message = await cem.get_message()
print("sending message")
cem._logger.debug("sending message")
await ws.send_json(message)
print("cem closed")
cem._logger.debug("cem closed")


async def websocket_consumer(ws, cem: CEM):
async for msg in ws:
print("RECEIVED: ", json.loads(msg.json()))
cem._logger.info(f"RECEIVED: {msg}")
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
# TODO: save cem state?
print("close...")
cem._logger.debug("close...")
cem.close()
await ws.close()
else:
await cem.handle_message(json.loads(msg.json()))
try:
await cem.handle_message(json.loads(msg.json()))
except TypeError:
await cem.handle_message(msg.json())

elif msg.type == aiohttp.WSMsgType.ERROR:
print("close...")
cem._logger.debug("close...")
cem.close()
print("ws connection closed with exception %s" % ws.exception())
cem._logger.error(f"ws connection closed with exception {ws.exception()}")
# TODO: save cem state?

print("websocket connection closed")
cem._logger.debug("websocket connection closed")


async def websocket_handler(request):
Expand All @@ -78,7 +90,11 @@ async def websocket_handler(request):
site_name, fm_client
)

cem = CEM(sensor_id=power_sensor["id"], fm_client=fm_client)
cem = CEM(
sensor_id=power_sensor["id"],
fm_client=fm_client,
logger=LOGGER,
)
frbc = FRBCSimple(
power_sensor_id=power_sensor["id"],
price_sensor_id=price_sensor["id"],
Expand Down