|
| 1 | +import concurrent.futures |
| 2 | +import threading |
| 3 | +from unittest import TestCase |
| 4 | +from unittest.mock import patch |
| 5 | + |
| 6 | +from awsiot.greengrasscoreipc.client import SubscribeToTopicStreamHandler |
| 7 | +from awsiot.greengrasscoreipc.model import CreateLocalDeploymentResponse, SubscribeToTopicResponse, \ |
| 8 | + SubscriptionResponseMessage, BinaryMessage |
| 9 | + |
| 10 | +TIMEOUT = 10.0 # seconds |
| 11 | + |
| 12 | + |
| 13 | +class GGV2Test(TestCase): |
| 14 | + |
| 15 | + def _mock_operation(self, mock_op, response): |
| 16 | + activate_fut = concurrent.futures.Future() |
| 17 | + activate_fut.set_result(None) |
| 18 | + mock_op.activate.return_value = activate_fut |
| 19 | + response_fut = concurrent.futures.Future() |
| 20 | + response_fut.set_result(response) |
| 21 | + mock_op.get_response.return_value = response_fut |
| 22 | + return mock_op |
| 23 | + |
| 24 | + @patch('awsiot.greengrasscoreipc.client.GreengrassCoreIPCClient') |
| 25 | + @patch('awsiot.greengrasscoreipc.client.CreateLocalDeploymentOperation') |
| 26 | + @patch('awsiot.greengrasscoreipc.client.SubscribeToTopicOperation') |
| 27 | + def test_connect(self, mock_client, mock_deployment_op, mock_subscribe_op): |
| 28 | + from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 as Client |
| 29 | + c = Client(client=mock_client) |
| 30 | + |
| 31 | + self._mock_operation(mock_deployment_op, CreateLocalDeploymentResponse(deployment_id="deployment")) |
| 32 | + mock_client.new_create_local_deployment.return_value = mock_deployment_op |
| 33 | + resp = c.create_local_deployment() |
| 34 | + self.assertEqual("deployment", resp.deployment_id) |
| 35 | + |
| 36 | + # Verify subscription works and callback is called on the executor thread |
| 37 | + self._mock_operation(mock_subscribe_op, SubscribeToTopicResponse(topic_name="abc")) |
| 38 | + mock_client.new_subscribe_to_topic.return_value = mock_subscribe_op |
| 39 | + |
| 40 | + subscription_fut = concurrent.futures.Future() |
| 41 | + thread_id_fut = concurrent.futures.Future() |
| 42 | + |
| 43 | + def on_stream_event(r): |
| 44 | + subscription_fut.set_result(r) |
| 45 | + thread_id_fut.set_result(threading.get_ident()) |
| 46 | + resp, op = c.subscribe_to_topic(topic="abc", on_stream_event=on_stream_event) |
| 47 | + self.assertEqual("abc", resp.topic_name) |
| 48 | + |
| 49 | + sub_handler = mock_client.new_subscribe_to_topic.call_args[0][0] |
| 50 | + sub_handler.on_stream_event(SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz"))) |
| 51 | + |
| 52 | + self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message) |
| 53 | + self.assertNotEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT)) |
| 54 | + |
| 55 | + # Verify that when using the stream_handler option, the callback is run in the executor |
| 56 | + |
| 57 | + subscription_fut = concurrent.futures.Future() |
| 58 | + thread_id_fut = concurrent.futures.Future() |
| 59 | + |
| 60 | + class handler(SubscribeToTopicStreamHandler): |
| 61 | + def on_stream_event(self, event): |
| 62 | + on_stream_event(event) |
| 63 | + resp, op = c.subscribe_to_topic(topic="abc", stream_handler=handler()) |
| 64 | + self.assertEqual("abc", resp.topic_name) |
| 65 | + |
| 66 | + sub_handler = mock_client.new_subscribe_to_topic.call_args[0][0] |
| 67 | + sub_handler.on_stream_event(SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz"))) |
| 68 | + |
| 69 | + self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message) |
| 70 | + self.assertNotEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT)) |
| 71 | + |
| 72 | + # Remove executor from client to verify that we are not running the callback in a different thread |
| 73 | + c = Client(client=mock_client, executor=None) |
| 74 | + |
| 75 | + subscription_fut = concurrent.futures.Future() |
| 76 | + thread_id_fut = concurrent.futures.Future() |
| 77 | + |
| 78 | + resp, op = c.subscribe_to_topic(topic="abc", on_stream_event=on_stream_event) |
| 79 | + self.assertEqual("abc", resp.topic_name) |
| 80 | + |
| 81 | + sub_handler = mock_client.new_subscribe_to_topic.call_args[0][0] |
| 82 | + sub_handler.on_stream_event(SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz"))) |
| 83 | + |
| 84 | + self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message) |
| 85 | + self.assertEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT)) |
0 commit comments