From 42bbda96a11ba3ba79918db46bda054e5a0e38d0 Mon Sep 17 00:00:00 2001 From: Eric Leong Date: Wed, 16 Mar 2022 21:53:00 +0000 Subject: [PATCH 1/5] fix send_pickle --- pylot/drivers/carla_camera_driver_operator.py | 4 ++-- pylot/drivers/carla_lidar_driver_operator.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pylot/drivers/carla_camera_driver_operator.py b/pylot/drivers/carla_camera_driver_operator.py index 447451036..be66edcf5 100644 --- a/pylot/drivers/carla_camera_driver_operator.py +++ b/pylot/drivers/carla_camera_driver_operator.py @@ -81,8 +81,8 @@ def release_data(self, timestamp): self._logger.debug("@{}: {} releasing sensor data".format( timestamp, self.config.name)) watermark_msg = erdos.WatermarkMessage(timestamp) - self._camera_stream.send_pickled(timestamp, - self._pickled_messages[timestamp]) + msg = pickle.loads(self._pickled_messages[timestamp]) + self._camera_stream.send(msg) # Note: The operator is set not to automatically propagate # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked. diff --git a/pylot/drivers/carla_lidar_driver_operator.py b/pylot/drivers/carla_lidar_driver_operator.py index e7f9c92fa..3eb2cea95 100644 --- a/pylot/drivers/carla_lidar_driver_operator.py +++ b/pylot/drivers/carla_lidar_driver_operator.py @@ -71,8 +71,8 @@ def release_data(self, timestamp): self._release_data = True else: watermark_msg = erdos.WatermarkMessage(timestamp) - self._lidar_stream.send_pickled(timestamp, - self._pickled_messages[timestamp]) + msg = pickle.loads(self._pickled_messages[timestamp]) + self._lidar_stream.send(msg) # Note: The operator is set not to automatically propagate # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked. From 6c25372c4b2a5b5ea468989a925aa9d62073ee14 Mon Sep 17 00:00:00 2001 From: Eric Leong Date: Thu, 17 Mar 2022 17:17:26 -0700 Subject: [PATCH 2/5] Update carla_camera_driver_operator.py --- pylot/drivers/carla_camera_driver_operator.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pylot/drivers/carla_camera_driver_operator.py b/pylot/drivers/carla_camera_driver_operator.py index be66edcf5..489bd3c36 100644 --- a/pylot/drivers/carla_camera_driver_operator.py +++ b/pylot/drivers/carla_camera_driver_operator.py @@ -56,8 +56,8 @@ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream, self._vehicle = None # The camera sensor actor object we obtain from the simulator. self._camera = None - self._pickle_lock = threading.Lock() - self._pickled_messages = {} + self._message_lock = threading.Lock() + self._messages = {} # Lock to ensure that the callbacks do not execute simultaneously. self._lock = threading.Lock() # If false then the operator does not send data until it receives @@ -81,8 +81,7 @@ def release_data(self, timestamp): self._logger.debug("@{}: {} releasing sensor data".format( timestamp, self.config.name)) watermark_msg = erdos.WatermarkMessage(timestamp) - msg = pickle.loads(self._pickled_messages[timestamp]) - self._camera_stream.send(msg) + self._camera_stream.send(self._messages[timestamp]) # Note: The operator is set not to automatically propagate # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked. @@ -170,9 +169,6 @@ def process_images(self, simulator_image): self._camera_stream.send(msg) self._camera_stream.send(watermark_msg) else: - # Pickle the data, and release it upon release msg receipt. - pickled_msg = pickle.dumps( - msg, protocol=pickle.HIGHEST_PROTOCOL) - with self._pickle_lock: - self._pickled_messages[msg.timestamp] = pickled_msg + with self._message_lock: + self._messages[msg.timestamp] = msg self._notify_reading_stream.send(watermark_msg) From 1f9e8ebb9a6ad5b7d8f0d66e25f18a16edc7bc65 Mon Sep 17 00:00:00 2001 From: Eric Leong Date: Thu, 17 Mar 2022 17:18:25 -0700 Subject: [PATCH 3/5] Update carla_lidar_driver_operator.py --- pylot/drivers/carla_lidar_driver_operator.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/pylot/drivers/carla_lidar_driver_operator.py b/pylot/drivers/carla_lidar_driver_operator.py index 3eb2cea95..2635a8ed6 100644 --- a/pylot/drivers/carla_lidar_driver_operator.py +++ b/pylot/drivers/carla_lidar_driver_operator.py @@ -50,8 +50,8 @@ def __init__(self, ground_vehicle_id_stream: erdos.ReadStream, self._vehicle = None # Handle to the Lidar simulator actor. self._lidar = None - self._pickle_lock = threading.Lock() - self._pickled_messages = {} + self._message_lock = threading.Lock() + self._messages = {} self._lock = threading.Lock() # If false then the operator does not send data until it receives # release data watermark. Otherwise, it sends as soon as it @@ -71,14 +71,13 @@ def release_data(self, timestamp): self._release_data = True else: watermark_msg = erdos.WatermarkMessage(timestamp) - msg = pickle.loads(self._pickled_messages[timestamp]) - self._lidar_stream.send(msg) + self._camera_stream.send(self._messages[timestamp]) # Note: The operator is set not to automatically propagate # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked. self._lidar_stream.send(watermark_msg) - with self._pickle_lock: - del self._pickled_messages[timestamp] + with self._message_lock: + del self._messages[timestamp] def process_point_clouds(self, simulator_pc): """ Invoked when a point cloud is received from the simulator. @@ -105,11 +104,8 @@ def process_point_clouds(self, simulator_pc): self._lidar_stream.send(msg) self._lidar_stream.send(watermark_msg) else: - # Pickle the data, and release it upon release msg receipt. - pickled_msg = pickle.dumps( - msg, protocol=pickle.HIGHEST_PROTOCOL) - with self._pickle_lock: - self._pickled_messages[msg.timestamp] = pickled_msg + with self._message_lock: + self._messages[msg.timestamp] = msg self._notify_reading_stream.send(watermark_msg) def run(self): From a1a79f54dcbc2b9c93dc4724180f8fd228638bf9 Mon Sep 17 00:00:00 2001 From: Eric Leong Date: Thu, 17 Mar 2022 17:18:55 -0700 Subject: [PATCH 4/5] Update carla_camera_driver_operator.py --- pylot/drivers/carla_camera_driver_operator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pylot/drivers/carla_camera_driver_operator.py b/pylot/drivers/carla_camera_driver_operator.py index 489bd3c36..36ee1eaf4 100644 --- a/pylot/drivers/carla_camera_driver_operator.py +++ b/pylot/drivers/carla_camera_driver_operator.py @@ -86,8 +86,8 @@ def release_data(self, timestamp): # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked. self._camera_stream.send(watermark_msg) - with self._pickle_lock: - del self._pickled_messages[timestamp] + with self._message_lock: + del self._messages[timestamp] def run(self): # Read the vehicle id from the vehicle id stream From 3713c48b926af8c252437980c8486fc07fed90d1 Mon Sep 17 00:00:00 2001 From: Eric Leong Date: Thu, 17 Mar 2022 17:19:37 -0700 Subject: [PATCH 5/5] Update carla_lidar_driver_operator.py --- pylot/drivers/carla_lidar_driver_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylot/drivers/carla_lidar_driver_operator.py b/pylot/drivers/carla_lidar_driver_operator.py index 2635a8ed6..377bec14c 100644 --- a/pylot/drivers/carla_lidar_driver_operator.py +++ b/pylot/drivers/carla_lidar_driver_operator.py @@ -71,7 +71,7 @@ def release_data(self, timestamp): self._release_data = True else: watermark_msg = erdos.WatermarkMessage(timestamp) - self._camera_stream.send(self._messages[timestamp]) + self._lidar_stream.send(self._messages[timestamp]) # Note: The operator is set not to automatically propagate # watermark messages received on input streams. Thus, we can # issue watermarks only after the simulator callback is invoked.