diff --git a/python_transport/requirements.txt b/python_transport/requirements.txt index 33345c9b..ddbcff65 100644 --- a/python_transport/requirements.txt +++ b/python_transport/requirements.txt @@ -1,5 +1,5 @@ # wirepas -wirepas_messaging==1.2.0 +wirepas_messaging==1.4.0 # dbus bindings pydbus==0.6.0 diff --git a/python_transport/wirepas_gateway/protocol/packet_queue.py b/python_transport/wirepas_gateway/protocol/packet_queue.py new file mode 100644 index 00000000..efe8befd --- /dev/null +++ b/python_transport/wirepas_gateway/protocol/packet_queue.py @@ -0,0 +1,152 @@ +# Copyright 2020 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. + +from threading import Thread, Event, Lock +from datetime import datetime, timedelta + + +class MessageQueue(Thread): + def __init__( + self, + logger, + on_multi_packet_ready_cb, + dst_endpoints=None, + max_packets=0, + max_size=0, + max_queuing_time_s=0, + filter_name="No name", + ): + Thread.__init__(self) + # Daemonize thread to exit with full process + self.daemon = True + + self.logger = logger + + if dst_endpoints.__len__() == 0: + self.logger.error( + "Cannot create a group of packets without endpoint criteria" + ) + raise ValueError + + if max_packets == 0 and max_size == 0 and max_queuing_time_s == 0: + self.logger.error( + "Group of packets need at least one criteria to end grouping" + ) + raise ValueError + + self.endpoints = dst_endpoints + self.max_packets = max_packets + self.max_size = max_size + self.max_queing_time_s = max_queuing_time_s + self.filter_name = filter_name + + self.running = False + self.message_received_event = Event() + self.message_received_event.clear() + + self._messages_list = [] + self._next_expiration_date = None + + self._flush = False + # Lock to protect the list + self._lock = Lock() + + self.on_packets_ready_cb = on_multi_packet_ready_cb + + self.logger.debug( + 'Group "%s" created with param: delay=%s max_p=%s max_s=%s' + % ( + self.filter_name, + self.max_queing_time_s, + self.max_packets, + self.max_size, + ) + ) + + def is_message_for_me(self, dst_endpoint): + if self.endpoints is None or dst_endpoint in self.endpoints: + self.logger.debug("Packet match filter %s" % self.filter_name) + return True + return False + + def flush(self): + # Force a send of what the queue contain + self._flush = True + self.message_received_event.set() + + def queue_message(self, message): + # Queue the message + with self._lock: + if len(self._messages_list) == 0 and self.max_queing_time_s >= 0: + # No message in queue yet, so take the current timestamp + self._next_expiration_date = datetime.now() + timedelta( + seconds=self.max_queing_time_s + ) + self.logger.debug( + "Set next expiration date to %s" % self._next_expiration_date + ) + + self._messages_list.append(message) + + # Notify the other thread that a message was received + self.message_received_event.set() + + def run(self): + """ + Main queue loop that is in charge of creating and sending the packet when needed + """ + self.running = True + while self.running: + # Compute timeout for next execution: + now = datetime.now() + if self._next_expiration_date is None: + timeout = None + elif self._next_expiration_date < now: + # It should never happen + timeout = 0 + else: + timeout = (self._next_expiration_date - now).total_seconds() + + self.logger.debug( + "Filter %s: waiting for again %s" % (self.filter_name, timeout) + ) + + self.message_received_event.wait(timeout) + + with self._lock: + # Check what happen + now = datetime.now() + send = False + if self.message_received_event.is_set(): + # We received a new message + # Is max packet reached + if len(self._messages_list) >= self.max_packets: + self.logger.debug( + "SEND: limit reached: %d packet in list vs %s" + % (len(self._messages_list), self.max_packets) + ) + send = True + + # Max size reached + # TODO evaluate full size packet + + # Is it time to flush + if self._flush: + self.logger.debug("SEND: Flush") + send = True + + # Clear the event + self.message_received_event.clear() + + # Max delay reached + if now >= self._next_expiration_date: + self.logger.debug("SEND: expiration date") + send = True + + if send: + # TODO list can contain more than max packets. Should be tested somwhere + if self.on_packets_ready_cb(self._messages_list, self.filter_name): + # Reset counter and list + self._next_expiration_date = None + self._messages_list.clear() diff --git a/python_transport/wirepas_gateway/protocol/topic_helper.py b/python_transport/wirepas_gateway/protocol/topic_helper.py index 84a5f51d..ac3d9cb7 100644 --- a/python_transport/wirepas_gateway/protocol/topic_helper.py +++ b/python_transport/wirepas_gateway/protocol/topic_helper.py @@ -68,6 +68,10 @@ def make_otap_process_scratchpad_request_topic(gw_id="+", sink_id="+"): def make_get_gateway_info_request_topic(gw_id): return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)]) + @staticmethod + def make_collection_request_topic(gw_id="+"): + return TopicGenerator._make_request_topic("multiple_request", [str(gw_id)]) + ################## # Response Part ################## @@ -133,6 +137,15 @@ def make_received_data_topic( [str(gw_id), str(sink_id), str(network_id), str(src_ep), str(dst_ep)], ) + @staticmethod + def make_received_multi_data_topic( + gw_id="+", sink_id="+", network_id="+", src_ep="+", dst_ep="+" + ): + return TopicGenerator._make_event_topic( + "multi_received_data", + [str(gw_id), str(sink_id), str(network_id), str(src_ep), str(dst_ep)], + ) + class TopicParser: """ diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 9448f2e4..b6ae84f5 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -10,6 +10,7 @@ import wirepas_messaging from wirepas_gateway.dbus.dbus_client import BusClient +from wirepas_gateway.protocol.packet_queue import MessageQueue from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser from wirepas_gateway.protocol.mqtt_wrapper import MQTTWrapper from wirepas_gateway.utils import ParserHelper @@ -238,6 +239,50 @@ def __init__(self, settings, logger=None, **kwargs): ) self.monitoring_thread.start() + # Create a group + # TODO: take it from conf + group_ep_11 = MessageQueue( + logger=self.logger, + dst_endpoints=[11], + on_multi_packet_ready_cb=self._on_time_to_publish_group_cb, + max_packets=8, + max_queuing_time_s=100, + filter_name="Ep_10", + ) + + group_ep_255 = MessageQueue( + logger=self.logger, + dst_endpoints=[255], + on_multi_packet_ready_cb=self._on_time_to_publish_group_cb, + max_packets=10, + max_queuing_time_s=65, + filter_name="Diags", + ) + + group_ep_11.start() + group_ep_255.start() + + self._packet_group_filters = [group_ep_11, group_ep_255] + + self.message_handler_map = { + "GetConfigsRequest": self._process_get_configs_request, + "SetConfigRequest": self._process_set_config_request, + "SendDataRequest": self._process_send_data_request, + "GetScratchpadStatusRequest": self._process_otap_scratchpad_request, + "UploadScratchpadRequest": self._process_otap_upload_scratchpad_request, + "ProcessScratchpadRequest": self._process_otap_scratchpad_request, + "GetGatewayInfoRequest": self._process_get_gateway_info_request, + } + + def _on_time_to_publish_group_cb(self, messages, filter_name): + self.logger.debug("Time to send group data") + topic = "gw-event/multi_packet" + self.logger.debug("Uplink traffic: %s | group %s", topic, filter_name) + collection_message = wirepas_messaging.gateway.api.GenericCollection(messages) + + self.mqtt_wrapper.publish(topic, collection_message.payload, qos=1) + return True + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -291,6 +336,9 @@ def _on_connect(self): topic, self._on_otap_process_scratchpad_request_received ) + topic = TopicGenerator.make_collection_request_topic(self.gw_id) + self.mqtt_wrapper.subscribe(topic, self._on_collection_message_received) + self._set_status() self.logger.info("MQTT connected!") @@ -309,6 +357,15 @@ def on_data_received( data, ): + sink = self.sink_manager.get_sink(sink_id) + if sink is None: + # It can happen at sink connection as messages can be received + # before sinks are identified + self.logger.info( + "Message received from unknown sink at the moment %s", sink_id + ) + return + if self.whitened_ep_filter is not None and dst_ep in self.whitened_ep_filter: # Only publish payload size but not the payload self.logger.debug("Filtering payload data") @@ -332,17 +389,16 @@ def on_data_received( hop_count=hop_count, ) - sink = self.sink_manager.get_sink(sink_id) - if sink is None: - # It can happen at sink connection as messages can be received - # before sinks are identified - self.logger.info( - "Message received from unknown sink at the moment %s", sink_id - ) - return - network_address = sink.get_network_address() + # Check if message must be queued + for group in self._packet_group_filters: + if group.is_message_for_me(dst_ep): + group.queue_message(event) + self.logger.debug("Message queued to %s queue" % group.filter_name) + return + + # Not a grouped message, publish it alone topic = TopicGenerator.make_received_data_topic( self.gw_id, sink_id, network_address, src_ep, dst_ep ) @@ -419,20 +475,9 @@ def on_sink_disconnected(self, name): self.logger.info("Sink disconnected, sending new configs") self._send_asynchronous_get_configs_response() - @deferred_thread - def _on_send_data_cmd_received(self, client, userdata, message): - # pylint: disable=unused-argument - try: - request = wirepas_messaging.gateway.api.SendDataRequest.from_payload( - message.payload - ) - except GatewayAPIParsingException as e: - self.logger.error(str(e)) - return - - # Get the sink-id from topic - _, sink_id = TopicParser.parse_send_data_topic(message.topic) - + def _process_send_data_request(self, request, sink_id=None): + if sink_id is None: + sink_id = request.sink_id self.logger.debug("Downlink traffic: %s | %s", sink_id, request.req_id) sink = self.sink_manager.get_sink(sink_id) @@ -464,17 +509,20 @@ def _on_send_data_cmd_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) @deferred_thread - def _on_get_configs_cmd_received(self, client, userdata, message): + def _on_send_data_cmd_received(self, client, userdata, message): # pylint: disable=unused-argument - self.logger.info("Config request received") try: - request = wirepas_messaging.gateway.api.GetConfigsRequest.from_payload( + request = wirepas_messaging.gateway.api.SendDataRequest.from_payload( message.payload ) + # Get the sink-id from topic (for backward compatibility) + _, sink_id = TopicParser.parse_send_data_topic(message.topic) + self._process_send_data_request(request, sink_id) except GatewayAPIParsingException as e: self.logger.error(str(e)) return + def _process_get_configs_request(self, request): # Create a list of different sink configs configs = [] for sink in self.sink_manager.get_sinks(): @@ -489,21 +537,19 @@ def _on_get_configs_cmd_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) - def _on_get_gateway_info_cmd_received(self, client, userdata, message): + @deferred_thread + def _on_get_configs_cmd_received(self, client, userdata, message): # pylint: disable=unused-argument - """ - This function doesn't need the decorator @deferred_thread as request is handled - without I/O - """ - self.logger.info("Gateway info request received") + self.logger.info("Config request received") try: - request = wirepas_messaging.gateway.api.GetGatewayInfoRequest.from_payload( + request = wirepas_messaging.gateway.api.GetConfigsRequest.from_payload( message.payload ) + self._process_get_configs_request(request) except GatewayAPIParsingException as e: self.logger.error(str(e)) - return + def _process_get_gateway_info_request(self, request): response = wirepas_messaging.gateway.api.GetGatewayInfoResponse( request.req_id, self.gw_id, @@ -517,18 +563,22 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): topic = TopicGenerator.make_get_gateway_info_response_topic(self.gw_id) self.mqtt_wrapper.publish(topic, response.payload, qos=2) - @deferred_thread - def _on_set_config_cmd_received(self, client, userdata, message): + def _on_get_gateway_info_cmd_received(self, client, userdata, message): # pylint: disable=unused-argument - self.logger.info("Set config request received") + """ + This function doesn't need the decorator @deferred_thread as request is handled + without I/O + """ + self.logger.info("Gateway info request received") try: - request = wirepas_messaging.gateway.api.SetConfigRequest.from_payload( + request = wirepas_messaging.gateway.api.GetGatewayInfoRequest.from_payload( message.payload ) + self._process_get_gateway_info_request(request) except GatewayAPIParsingException as e: self.logger.error(str(e)) - return + def _process_set_config_request(self, request): self.logger.debug("Set sink config: %s", request) sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: @@ -548,17 +598,18 @@ def _on_set_config_cmd_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) @deferred_thread - def _on_otap_status_request_received(self, client, userdata, message): + def _on_set_config_cmd_received(self, client, userdata, message): # pylint: disable=unused-argument - self.logger.info("OTAP status request received") + self.logger.info("Set config request received") try: - request = wirepas_messaging.gateway.api.GetScratchpadStatusRequest.from_payload( + request = wirepas_messaging.gateway.api.SetConfigRequest.from_payload( message.payload ) + self._process_set_config_request(request) except GatewayAPIParsingException as e: self.logger.error(str(e)) - return + def _process_otap_status_request(self, request): sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: d = sink.get_scratchpad_status() @@ -589,17 +640,18 @@ def _on_otap_status_request_received(self, client, userdata, message): self.mqtt_wrapper.publish(topic, response.payload, qos=2) @deferred_thread - def _on_otap_upload_scratchpad_request_received(self, client, userdata, message): + def _on_otap_status_request_received(self, client, userdata, message): # pylint: disable=unused-argument - self.logger.info("OTAP upload request received") + self.logger.info("OTAP status request received") try: - request = wirepas_messaging.gateway.api.UploadScratchpadRequest.from_payload( + request = wirepas_messaging.gateway.api.GetScratchpadStatusRequest.from_payload( message.payload ) + self._process_otap_status_request(request) except GatewayAPIParsingException as e: self.logger.error(str(e)) - return + def _process_otap_upload_scratchpad_request(self, request): self.logger.info("OTAP upload request received for %s", request.sink_id) sink = self.sink_manager.get_sink(request.sink_id) @@ -619,17 +671,18 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) self.mqtt_wrapper.publish(topic, response.payload, qos=2) @deferred_thread - def _on_otap_process_scratchpad_request_received(self, client, userdata, message): + def _on_otap_upload_scratchpad_request_received(self, client, userdata, message): # pylint: disable=unused-argument - self.logger.info("OTAP process request received") + self.logger.info("OTAP upload request received") try: - request = wirepas_messaging.gateway.api.ProcessScratchpadRequest.from_payload( + request = wirepas_messaging.gateway.api.UploadScratchpadRequest.from_payload( message.payload ) + self._process_otap_upload_scratchpad_request(request) except GatewayAPIParsingException as e: self.logger.error(str(e)) - return + def _process_otap_scratchpad_request(self, request): sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: res = sink.process_scratchpad() @@ -646,6 +699,38 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread + def _on_otap_process_scratchpad_request_received(self, client, userdata, message): + # pylint: disable=unused-argument + self.logger.info("OTAP process request received") + try: + request = wirepas_messaging.gateway.api.ProcessScratchpadRequest.from_payload( + message.payload + ) + self._process_otap_scratchpad_request(request) + except GatewayAPIParsingException as e: + self.logger.error(str(e)) + + @deferred_thread + def _on_collection_message_received(self, client, userdata, message): + self.logger.info("Collection message received") + try: + collection_message = wirepas_messaging.gateway.api.GenericCollection.from_payload( + message.payload + ) + except GatewayAPIParsingException as e: + self.logger.error(str(e)) + return + + for message in collection_message.messages: + try: + self.message_handler_map[message.__class__.__name__](message) + except KeyError: + self.logger.error( + "Gateway can handle this type of message %s" + % message.__class__.__name__ + ) + def parse_setting_list(list_setting): """ This function parse ep list specified from setting file or cmd line