diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 195bf2d..8776272 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ env: jobs: unit-tests: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest strategy: fail-fast: false diff --git a/AWSIoTPythonSDK/core/protocol/mqtt_core.py b/AWSIoTPythonSDK/core/protocol/mqtt_core.py index fbdd6bf..bff6456 100644 --- a/AWSIoTPythonSDK/core/protocol/mqtt_core.py +++ b/AWSIoTPythonSDK/core/protocol/mqtt_core.py @@ -28,7 +28,7 @@ from AWSIoTPythonSDK.core.protocol.internal.defaults import METRICS_PREFIX from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids -from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS +from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS, SUBACK_ERROR from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException from AWSIoTPythonSDK.exception.AWSIoTExceptions import disconnectError @@ -41,7 +41,7 @@ from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueFullException from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException -from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError, subackError from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeError from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeTimeoutException @@ -58,6 +58,12 @@ from queue import Queue +class SubackPacket(object): + def __init__(self): + self.event = Event() + self.data = None + + class MqttCore(object): _logger = logging.getLogger(__name__) @@ -298,12 +304,15 @@ def subscribe(self, topic, qos, message_callback=None): if ClientStatus.STABLE != self._client_status.get_status(): self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None)) else: - event = Event() - rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback) - if not event.wait(self._operation_timeout_sec): + suback = SubackPacket() + rc, mid = self._subscribe_async(topic, qos, self._create_blocking_suback_callback(suback), message_callback) + if not suback.event.wait(self._operation_timeout_sec): self._internal_async_client.remove_event_callback(mid) self._logger.error("Subscribe timed out") raise subscribeTimeoutException() + if suback.data and suback.data[0] == SUBACK_ERROR: + self._logger.error(f"Suback error return code: {suback.data[0]}") + raise subackError(suback=suback.data) ret = True return ret @@ -361,6 +370,12 @@ def ack_callback(mid, data=None): event.set() return ack_callback + def _create_blocking_suback_callback(self, ack: SubackPacket): + def ack_callback(mid, data=None): + ack.data = data + ack.event.set() + return ack_callback + def _handle_offline_request(self, type, data): self._logger.info("Offline request detected!") offline_request = QueueableRequest(type, data) diff --git a/AWSIoTPythonSDK/core/protocol/paho/client.py b/AWSIoTPythonSDK/core/protocol/paho/client.py index 0b637c5..1c60c81 100755 --- a/AWSIoTPythonSDK/core/protocol/paho/client.py +++ b/AWSIoTPythonSDK/core/protocol/paho/client.py @@ -97,6 +97,9 @@ CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 CONNACK_REFUSED_NOT_AUTHORIZED = 5 +# SUBACK codes +SUBACK_ERROR = 0x80 + # Connection state mqtt_cs_new = 0 mqtt_cs_connected = 1 @@ -137,6 +140,7 @@ MSG_QUEUEING_DROP_OLDEST = 0 MSG_QUEUEING_DROP_NEWEST = 1 + if sys.version_info[0] < 3: sockpair_data = "0" else: diff --git a/AWSIoTPythonSDK/exception/AWSIoTExceptions.py b/AWSIoTPythonSDK/exception/AWSIoTExceptions.py index 0de5401..d5b6d63 100755 --- a/AWSIoTPythonSDK/exception/AWSIoTExceptions.py +++ b/AWSIoTPythonSDK/exception/AWSIoTExceptions.py @@ -79,6 +79,11 @@ class subscribeError(operationError.operationError): def __init__(self, errorCode): self.message = "Subscribe Error: " + str(errorCode) +class subackError(operationError.operationError): + def __init__(self, suback=None): + self.message = "Received Error suback. Subscription failed." + self.suback = suback + class subscribeQueueFullException(operationError.operationError): def __init__(self): diff --git a/test/core/protocol/test_mqtt_core.py b/test/core/protocol/test_mqtt_core.py index 8469ea6..3cb0cb0 100644 --- a/test/core/protocol/test_mqtt_core.py +++ b/test/core/protocol/test_mqtt_core.py @@ -1,5 +1,6 @@ import AWSIoTPythonSDK from AWSIoTPythonSDK.core.protocol.mqtt_core import MqttCore +from AWSIoTPythonSDK.core.protocol.mqtt_core import SubackPacket from AWSIoTPythonSDK.core.protocol.internal.clients import InternalAsyncMqttClient from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatusContainer from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus @@ -20,6 +21,7 @@ from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueFullException from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueDisabledException from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from AWSIoTPythonSDK.exception.AWSIoTExceptions import subackError from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueFullException from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException @@ -29,6 +31,7 @@ from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_ERRNO +from AWSIoTPythonSDK.core.protocol.paho.client import SUBACK_ERROR from AWSIoTPythonSDK.core.protocol.paho.client import MQTTv311 from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS try: @@ -61,6 +64,7 @@ KEY_EXPECTED_QUEUE_APPEND_RESULT = "ExpectedQueueAppendResult" KEY_EXPECTED_REQUEST_MID_OVERRIDE = "ExpectedRequestMidOverride" KEY_EXPECTED_REQUEST_TIMEOUT = "ExpectedRequestTimeout" +KEY_EXPECTED_ACK_RESULT = "ExpectedAckPacketResult" SUCCESS_RC_EXPECTED_VALUES = { KEY_EXPECTED_REQUEST_RC : DUMMY_SUCCESS_RC } @@ -73,6 +77,10 @@ NO_TIMEOUT_EXPECTED_VALUES = { KEY_EXPECTED_REQUEST_TIMEOUT : False } +ERROR_SUBACK_EXPECTED_VALUES = { + KEY_EXPECTED_ACK_RESULT : (SUBACK_ERROR, None) +} + QUEUED_EXPECTED_VALUES = { KEY_EXPECTED_QUEUE_APPEND_RESULT : AppendResults.APPEND_SUCCESS } @@ -121,6 +129,9 @@ def setup_class(cls): RequestTypes.SUBSCRIBE: subscribeError, RequestTypes.UNSUBSCRIBE: unsubscribeError } + cls.ack_error = { + RequestTypes.SUBSCRIBE : subackError, + } cls.request_queue_full = { RequestTypes.PUBLISH : publishQueueFullException, RequestTypes.SUBSCRIBE: subscribeQueueFullException, @@ -518,6 +529,9 @@ def test_subscribe_success(self): def test_subscribe_timeout(self): self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, TIMEOUT_EXPECTED_VALUES) + + def test_subscribe_error_suback(self): + self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, ERROR_SUBACK_EXPECTED_VALUES) def test_subscribe_queued(self): self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, QUEUED_EXPECTED_VALUES) @@ -547,6 +561,7 @@ def _internal_test_sync_api_with(self, request_type, expected_values): expected_request_mid = expected_values.get(KEY_EXPECTED_REQUEST_MID_OVERRIDE) expected_timeout = expected_values.get(KEY_EXPECTED_REQUEST_TIMEOUT) expected_append_result = expected_values.get(KEY_EXPECTED_QUEUE_APPEND_RESULT) + expected_suback_result = expected_values.get(KEY_EXPECTED_ACK_RESULT) if expected_request_mid is None: expected_request_mid = DUMMY_REQUEST_MID @@ -562,7 +577,16 @@ def _internal_test_sync_api_with(self, request_type, expected_values): self.invoke_mqtt_core_sync_api[request_type](self, message_callback) else: self.python_event_mock.wait.return_value = True - assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True + if expected_suback_result is not None: + self._use_mock_python_suback() + # mock the suback with expected suback result + self.python_suback_mock.data = expected_suback_result + if expected_suback_result[0] == SUBACK_ERROR: + with pytest.raises(self.ack_error[request_type]): + self.invoke_mqtt_core_sync_api[request_type](self, message_callback) + self.python_suback_patcher.stop() + else: + assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True if expected_append_result is not None: self.client_status_mock.get_status.return_value = ClientStatus.ABNORMAL_DISCONNECT @@ -583,3 +607,10 @@ def _use_mock_python_event(self): self.python_event_constructor = self.python_event_patcher.start() self.python_event_mock = MagicMock() self.python_event_constructor.return_value = self.python_event_mock + + # Create a SubackPacket mock, which would mock the data in SubackPacket + def _use_mock_python_suback(self): + self.python_suback_patcher = patch(PATCH_MODULE_LOCATION + "SubackPacket", spec=SubackPacket) + self.python_suback_constructor = self.python_suback_patcher.start() + self.python_suback_mock = MagicMock() + self.python_suback_constructor.return_value = self.python_suback_mock