4747# MQTT Commands
4848MQTT_PINGREQ = b"\xc0 \0 "
4949MQTT_PINGRESP = const (0xD0 )
50+ MQTT_PUBLISH = const (0x30 )
5051MQTT_SUB = b"\x82 "
5152MQTT_UNSUB = b"\xA2 "
5253MQTT_DISCONNECT = b"\xe0 \0 "
@@ -879,7 +880,9 @@ def loop(self, timeout=0):
879880 def _wait_for_msg (self , timeout = 0.1 ):
880881 # pylint: disable = too-many-return-statements
881882
882- """Reads and processes network events."""
883+ """Reads and processes network events.
884+ Return the packet type or None if there is nothing to be received.
885+ """
883886 # CPython socket module contains a timeout attribute
884887 if hasattr (self ._socket_pool , "timeout" ):
885888 try :
@@ -909,8 +912,11 @@ def _wait_for_msg(self, timeout=0.1):
909912 "Unexpected PINGRESP returned from broker: {}." .format (sz )
910913 )
911914 return MQTT_PINGRESP
912- if res [0 ] & 0xF0 != 0x30 :
915+
916+ if res [0 ] & 0xF0 != MQTT_PUBLISH :
913917 return res [0 ]
918+
919+ # Handle only the PUBLISH packet type from now on.
914920 sz = self ._recv_len ()
915921 # topic length MSB & LSB
916922 topic_len = self ._sock_exact_recv (2 )
@@ -923,12 +929,13 @@ def _wait_for_msg(self, timeout=0.1):
923929 pid = self ._sock_exact_recv (2 )
924930 pid = pid [0 ] << 0x08 | pid [1 ]
925931 sz -= 0x02
932+
926933 # read message contents
927934 raw_msg = self ._sock_exact_recv (sz )
928935 msg = raw_msg if self ._use_binary_mode else str (raw_msg , "utf-8" )
929936 if self .logger is not None :
930937 self .logger .debug (
931- "Receiving SUBSCRIBE \n Topic: %s\n Msg: %s\n " , topic , raw_msg
938+ "Receiving PUBLISH \n Topic: %s\n Msg: %s\n " , topic , raw_msg
932939 )
933940 self ._handle_on_message (self , topic , msg )
934941 if res [0 ] & 0x06 == 0x02 :
@@ -937,6 +944,7 @@ def _wait_for_msg(self, timeout=0.1):
937944 self ._sock .send (pkt )
938945 elif res [0 ] & 6 == 4 :
939946 assert 0
947+
940948 return res [0 ]
941949
942950 def _recv_len (self ):
0 commit comments