From 06c897420fe1fc53fbc4a0a20b23e4679518f211 Mon Sep 17 00:00:00 2001 From: Quentin Bernet Date: Fri, 9 May 2025 11:41:34 +0200 Subject: [PATCH 1/5] fix: "open api call" in the operator UI sometimes leading to the wrong call --- tofupilot/client.py | 26 +++++++++++--------- tofupilot/openhtf/tofupilot.py | 18 ++++++++++---- tofupilot/openhtf/upload.py | 45 ++++++++++++++++------------------ 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/tofupilot/client.py b/tofupilot/client.py index d226f82..51068be 100644 --- a/tofupilot/client.py +++ b/tofupilot/client.py @@ -224,12 +224,13 @@ def create_run_from_openhtf_report(self, file_path: str): https://www.tofupilot.com/docs/api#create-a-run-from-a-file """ # Upload report and create run from file_path - run_id = self.upload_and_create_from_openhtf_report(file_path) + upload_res = self.upload_and_create_from_openhtf_report(file_path) - # If run_id is not a string, it's an error response dictionary - if not isinstance(run_id, str): + if not upload_res.get("success", False): self._logger.error("OpenHTF import failed") - return run_id + return "" + + run_id = upload_res["run_id"] # Only continue with attachment upload if run_id is valid test_record = None @@ -396,13 +397,12 @@ def delete_unit(self, serial_number: str) -> dict: def upload_and_create_from_openhtf_report( self, file_path: str, - ) -> str: + ) -> Dict: """ Takes a path to an OpenHTF JSON file report, uploads it and creates a run from it. Returns: - str: - Id of the newly created run + Dict """ print("") @@ -449,8 +449,8 @@ def upload_and_create_from_openhtf_report( verify=self._verify, ) - # Return only the ID if successful, otherwise return the full result - if result.get("success", False) is not False: + # Return only the ID if successful, otherwise return the full result + if result.get("success", True) is not False: run_id = result.get("id") run_url = result.get("url") @@ -460,9 +460,13 @@ def upload_and_create_from_openhtf_report( elif run_id: self._logger.success(f"Run imported successfully with ID: {run_id}") - return run_id + return { + "success": True, + "run_id": run_id, + "upload_id": upload_id, + } else: - return result + return {**result, "upload_id": upload_id,} def get_connection_credentials(self) -> dict: """ diff --git a/tofupilot/openhtf/tofupilot.py b/tofupilot/openhtf/tofupilot.py index 3f6a051..135f560 100644 --- a/tofupilot/openhtf/tofupilot.py +++ b/tofupilot/openhtf/tofupilot.py @@ -123,11 +123,18 @@ def __init__( self._logger = self.client._logger self._streaming_setup_thread = None + def _upload(self, testRecord: TestRecord): + + # Side effecting ! + upload_id = upload(api_key=self.api_key, url=self.url, client=self.client)(testRecord) + + if (self.stream): + self._final_update(upload_id, testRecord) + def __enter__(self): # Add upload callback without pausing the logger yet self.test.add_output_callbacks( - upload(api_key=self.api_key, url=self.url, client=self.client), - self._final_update, + self._upload ) # Start streaming setup before pausing the logger @@ -389,14 +396,14 @@ def _handle_answer(self, plug_name, method_name, args): f"Operator UI: Method call failed - {method_name}({', '.join(args)}) - {e}" ) - def _final_update(self, testRecord: TestRecord): + def _final_update(self, upload_id: str, testRecord: TestRecord): """ If the test is fast enough, the watcher never triggers, to avoid the UI being out of sync, we force send at least once at the very end of the test """ - # Skip if streaming is disabled or MQTT client doesn't exist - if not self.stream or self.mqttClient is None: + # Skip if MQTT client doesn't exist + if self.mqttClient is None: return test_record_dict = testRecord.as_base_types() @@ -406,6 +413,7 @@ def _final_update(self, testRecord: TestRecord): "test_record": test_record_dict, "plugs": {"plug_states": {}}, "running_phase_state": {}, + "upload_id": upload_id } self._send_update(test_state_dict) diff --git a/tofupilot/openhtf/upload.py b/tofupilot/openhtf/upload.py index c3e5f5b..7137ff0 100644 --- a/tofupilot/openhtf/upload.py +++ b/tofupilot/openhtf/upload.py @@ -68,7 +68,13 @@ def __init__( self._max_attachments = self.client._max_attachments self._max_file_size = self.client._max_file_size - def __call__(self, test_record: TestRecord): + def __call__(self, test_record: TestRecord) -> str: + """ + Returns: + str: + Id of the initial upload + (This id is present in the ApiCall table of the database) + """ # Resume logger to ensure it's active during attachment processing was_logger_resumed = False if hasattr(self._logger, "resume"): @@ -115,27 +121,16 @@ def __call__(self, test_record: TestRecord): # Extract run_id from response - it could be a string (id) or a dict (result with id field) run_id = None - if isinstance(result, dict): - # It's a dictionary response - if not result.get("success", True): - self._logger.error("Run creation failed, skipping attachments") - return - - # Try to get the ID from the dictionary - run_id = result.get("id") - else: - # Direct ID string - run_id = result - - # Final validation of run_id - if not run_id or not isinstance(run_id, str): - self._logger.error( - f"Invalid run ID received: {run_id}, skipping attachments" - ) - return + if not result.get("success", False): + self._logger.error("Run creation failed, skipping attachments") + return result.get("upload_id", "") + + run_id = result.get("run_id") + original_upload_id: str = result.get("upload_id") + except Exception as e: self._logger.error(f"Error creating run: {str(e)}") - return + return "" finally: # Ensure the file is deleted after processing if os.path.exists(filename): @@ -239,7 +234,9 @@ def __call__(self, test_record: TestRecord): f"Failed to process attachment: {str(e)}" ) continue - finally: - # For attachment logs to be visible, we intentionally don't pause the logger here - # Instead, we'll let the TofuPilot class's __exit__ method handle the logger state - pass + return original_upload_id + except Exception as e: + self._logger.error( + f"Otherwise uncaught exception: {str(e)}" + ) + return "" From 5e018a4eeb55f7680fe5eabbd7944674a64c4674 Mon Sep 17 00:00:00 2001 From: Quentin Bernet Date: Mon, 12 May 2025 13:53:39 +0200 Subject: [PATCH 2/5] fix: incorrect instruction on how to disable streaming on streaming fail --- tofupilot/openhtf/tofupilot.py | 48 +++++++++------------------------- 1 file changed, 13 insertions(+), 35 deletions(-) diff --git a/tofupilot/openhtf/tofupilot.py b/tofupilot/openhtf/tofupilot.py index 135f560..81b3b2d 100644 --- a/tofupilot/openhtf/tofupilot.py +++ b/tofupilot/openhtf/tofupilot.py @@ -203,27 +203,25 @@ def _setup_streaming_with_state(self): def _setup_streaming(self): try: - try: - cred = self.client.get_connection_credentials() - except Exception as e: - self._logger.warning(f"Operator UI: JWT error: {e}") + def display_help_disable_streaming(): # Print with yellow color for consistency with warnings yellow = "\033[0;33m" reset = "\033[0m" print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + f"{yellow}To disable Operator UI streaming, use TofuPilot(..., stream=False) in your script{reset}" ) + + try: + cred = self.client.get_connection_credentials() + except Exception as e: + self._logger.warning(f"Operator UI: JWT error: {e}") + display_help_disable_streaming() self.stream = False # Disable streaming on auth failure return if not cred: self._logger.warning("Operator UI: Auth server connection failed") - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) + display_help_disable_streaming() self.stream = False # Disable streaming on auth failure return self @@ -260,12 +258,7 @@ def _setup_streaming(self): self._logger.warning( f"Operator UI: Failed to connect with server (exception): {e}" ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) + display_help_disable_streaming() self.stream = False # Disable streaming on connection failure return @@ -273,12 +266,7 @@ def _setup_streaming(self): self._logger.warning( f"Operator UI: Failed to connect with server (error code): {connect_error_code}" ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) + display_help_disable_streaming() self.stream = False # Disable streaming on connection failure return self @@ -290,12 +278,7 @@ def _setup_streaming(self): self._logger.warning( f"Operator UI: Failed to subscribe to server (exception): {e}" ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) + display_help_disable_streaming() self.stream = False # Disable streaming on subscription failure return @@ -303,12 +286,7 @@ def _setup_streaming(self): self._logger.warning( f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) + display_help_disable_streaming() self.stream = False # Disable streaming on subscription failure return self From 56fb808e5a0b61088a74384da7256fd3fb615637 Mon Sep 17 00:00:00 2001 From: Quentin Bernet Date: Mon, 12 May 2025 14:00:15 +0200 Subject: [PATCH 3/5] fix: client disconnecting after around 1h --- tofupilot/openhtf/tofupilot.py | 199 +++++++++++++++++++-------------- 1 file changed, 116 insertions(+), 83 deletions(-) diff --git a/tofupilot/openhtf/tofupilot.py b/tofupilot/openhtf/tofupilot.py index 81b3b2d..65504a4 100644 --- a/tofupilot/openhtf/tofupilot.py +++ b/tofupilot/openhtf/tofupilot.py @@ -1,3 +1,4 @@ +import time import types from typing import Optional from time import sleep @@ -9,6 +10,7 @@ import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion +from paho.mqtt.reasoncodes import ReasonCode from openhtf.core.test_record import TestRecord from openhtf.core.test_state import TestState @@ -120,6 +122,7 @@ def __init__( self.update_task = None self.mqttClient = None self.publishOptions = None + self.failed_connection_attempts = 0 self._logger = self.client._logger self._streaming_setup_thread = None @@ -201,94 +204,107 @@ def _setup_streaming_with_state(self): self._setup_streaming() self.connection_completed = True - def _setup_streaming(self): + def _connect_streaming(self) -> str: + + def display_help_disable_streaming(): + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use TofuPilot(..., stream=False) in your script{reset}" + ) + try: - def display_help_disable_streaming(): - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use TofuPilot(..., stream=False) in your script{reset}" - ) + cred = self.client.get_connection_credentials() + except Exception as e: + self._logger.warning(f"Operator UI: JWT error: {e}") + display_help_disable_streaming() + self.stream = False # Disable streaming on auth failure + return "" + + if not cred: + self._logger.warning("Operator UI: Auth server connection failed") + display_help_disable_streaming() + self.stream = False # Disable streaming on auth failure + return "" + + # Since we control the server, we know these will be set + token = cred["token"] + operatorPage = cred["operatorPage"] + clientOptions = cred["clientOptions"] + willOptions = cred["willOptions"] + connectOptions = cred["connectOptions"] + self.publishOptions = cred["publishOptions"] + subscribeOptions = cred["subscribeOptions"] + + self.mqttClient = mqtt.Client( + callback_api_version=CallbackAPIVersion.VERSION2, **clientOptions + ) - try: - cred = self.client.get_connection_credentials() - except Exception as e: - self._logger.warning(f"Operator UI: JWT error: {e}") - display_help_disable_streaming() - self.stream = False # Disable streaming on auth failure - return + # This is not 100% reliable, hence the need to put the setup in the background + # See https://github.com/eclipse-paho/paho.mqtt.python/issues/890 + self.mqttClient.connect_timeout = 1.0 - if not cred: - self._logger.warning("Operator UI: Auth server connection failed") - display_help_disable_streaming() - self.stream = False # Disable streaming on auth failure - return self - - # Since we control the server, we know these will be set - token = cred["token"] - operatorPage = cred["operatorPage"] - clientOptions = cred["clientOptions"] - willOptions = cred["willOptions"] - connectOptions = cred["connectOptions"] - self.publishOptions = cred["publishOptions"] - subscribeOptions = cred["subscribeOptions"] - - self.mqttClient = mqtt.Client( - callback_api_version=CallbackAPIVersion.VERSION2, **clientOptions - ) + self.mqttClient.tls_set() - # This is not 100% reliable, hence the need to put the setup in the background - # See https://github.com/eclipse-paho/paho.mqtt.python/issues/890 - self.mqttClient.connect_timeout = 1.0 + self.mqttClient.will_set(**willOptions) - self.mqttClient.tls_set() + self.mqttClient.username_pw_set("pythonClient", token) - self.mqttClient.will_set(**willOptions) + self.mqttClient.on_message = self._on_message + self.mqttClient.on_connect = self._on_connect + self.mqttClient.on_disconnect = self._on_disconnect + self.mqttClient.on_unsubscribe = self._on_unsubscribe - self.mqttClient.username_pw_set("pythonClient", token) + try: + connect_error_code = self.mqttClient.connect(**connectOptions) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to connect with server (exception): {e}" + ) + display_help_disable_streaming() + self.stream = False # Disable streaming on connection failure + return "" - self.mqttClient.on_message = self._on_message - self.mqttClient.on_disconnect = self._on_disconnect - self.mqttClient.on_unsubscribe = self._on_unsubscribe + if connect_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to connect with server (error code): {connect_error_code}" + ) + display_help_disable_streaming() + self.stream = False # Disable streaming on connection failure + return "" - try: - connect_error_code = self.mqttClient.connect(**connectOptions) - except Exception as e: - self._logger.warning( - f"Operator UI: Failed to connect with server (exception): {e}" - ) - display_help_disable_streaming() - self.stream = False # Disable streaming on connection failure - return + try: + subscribe_error_code, messageId = self.mqttClient.subscribe( + **subscribeOptions + ) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (exception): {e}" + ) + display_help_disable_streaming() + self.stream = False # Disable streaming on subscription failure + return "" - if connect_error_code != mqtt.MQTT_ERR_SUCCESS: - self._logger.warning( - f"Operator UI: Failed to connect with server (error code): {connect_error_code}" - ) - display_help_disable_streaming() - self.stream = False # Disable streaming on connection failure - return self + if subscribe_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" + ) + display_help_disable_streaming() + self.stream = False # Disable streaming on subscription failure + return "" + + return operatorPage + - try: - subscribe_error_code, messageId = self.mqttClient.subscribe( - **subscribeOptions - ) - except Exception as e: - self._logger.warning( - f"Operator UI: Failed to subscribe to server (exception): {e}" - ) - display_help_disable_streaming() - self.stream = False # Disable streaming on subscription failure - return + def _setup_streaming(self): + try: - if subscribe_error_code != mqtt.MQTT_ERR_SUCCESS: - self._logger.warning( - f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" - ) - display_help_disable_streaming() - self.stream = False # Disable streaming on subscription failure - return self + operator_page = self._connect_streaming() + + # We will already have displayed the error message + if operator_page == "": + return self.mqttClient.loop_start() @@ -303,7 +319,7 @@ def display_help_disable_streaming(): # Create clickable URL clickable_url = ( - f"\033]8;;{operatorPage}\033\\{operatorPage}\033]8;;\033\\" + f"\033]8;;{operator_page}\033\\{operator_page}\033]8;;\033\\" ) # Print single line connection message with URL @@ -312,7 +328,7 @@ def display_help_disable_streaming(): except: # Fallback for terminals that don't support ANSI self._logger.success(f"Connected to TofuPilot real-time server") - self._logger.success(f"Access Operator UI: {operatorPage}") + self._logger.success(f"Access Operator UI: {operator_page}") except Exception as e: self._logger.warning(f"Operator UI: Setup error - {e}") @@ -404,13 +420,30 @@ def _on_message(self, client, userdata, message): if parsed["source"] == "web": self._handle_answer(**parsed["message"]) + def _on_connect( + self, client, userdata, connect_flags, reason_code, properties + ): + self.failed_connection_attempts = 0 + def _on_disconnect( - self, client, userdata, disconnect_flags, reason_code, properties + self, client, userdata, disconnect_flags: mqtt.DisconnectFlags, reason_code: ReasonCode, properties ): - if reason_code != mqtt.MQTT_ERR_SUCCESS: - self._logger.warning( - f"Operator UI: Unexpected disconnect (code {reason_code})" - ) + if reason_code != mqtt.MQTT_ERR_SUCCESS or disconnect_flags.is_disconnect_packet_from_server: + # Exponential backoff + if self.failed_connection_attempts > 0: + time.sleep( + 2**self.failed_connection_attempts + ) + self.failed_connection_attempts += 1 + + if self.failed_connection_attempts > 5: + self._logger.warning( + f"Operator UI: Unexpected disconnect (code {reason_code})" + ) + self._connect_streaming() + self.mqttClient.loop_start() + test_state_dict, _ = _to_dict_with_event(self.test.state) + self._send_update(test_state_dict) def _on_unsubscribe(self, client, userdata, mid, reason_code_list, properties): if any( From cfc2a26b623fa45d0e3191adfc43b314727e6181 Mon Sep 17 00:00:00 2001 From: Quentin Bernet Date: Thu, 15 May 2025 10:05:28 +0200 Subject: [PATCH 4/5] fix: issue uploading attachments fix: use of feature unsupported in python 3.9 --- tofupilot/utils/files.py | 59 ++++++++++++++------------------------ tofupilot/utils/network.py | 2 +- 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/tofupilot/utils/files.py b/tofupilot/utils/files.py index 26cf4a1..f569a15 100644 --- a/tofupilot/utils/files.py +++ b/tofupilot/utils/files.py @@ -102,9 +102,26 @@ def upload_file( def notify_server( - headers: dict, url: str, upload_id: str, run_id: str, logger=None, verify: str | None = None + headers: dict, + url: str, + upload_id: str, + run_id: str, + logger = None, + verify = None, # str | None ) -> bool: - """Tells TP server to sync upload with newly created run""" + """Tells TP server to sync upload with newly created run + + Args: + headers (dict): Request headers including authorization + url (str): Base API URL + upload_id (str): ID of the upload to link + run_id (str): ID of the run to link to + logger (Optional[Logger]): The logger to use + verify (Optional[str]): Path to a CA bundle file to verify the server certificate + + Returns: + bool: True if successful + """ sync_url = f"{url}/uploads/sync" sync_payload = {"upload_id": upload_id, "run_id": run_id} @@ -135,7 +152,7 @@ def upload_attachment_data( data, mimetype: str, run_id: str, - verify: str | None, + verify, #: str | None, ) -> bool: """ Uploads binary data as an attachment and links it to a run @@ -184,40 +201,6 @@ def upload_attachment_data( logger.error(f"Upload failed: {name} - {str(e)}") return False - -def notify_server( - headers: dict, - url: str, - upload_id: str, - run_id: str, - verify: Optional[str] = None, -) -> bool: - """Tells TP server to sync upload with newly created run - - Args: - headers (dict): Request headers including authorization - url (str): Base API URL - upload_id (str): ID of the upload to link - run_id (str): ID of the run to link to - verify (Optional[str]): Path to a CA bundle file to verify the server certificate - - Returns: - bool: True if successful - """ - sync_url = f"{url}/uploads/sync" - sync_payload = {"upload_id": upload_id, "run_id": run_id} - - response = requests.post( - sync_url, - data=json.dumps(sync_payload), - verify=verify, - headers=headers, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - return response.status_code == 200 - - def upload_attachments( logger: Logger, headers: dict, @@ -282,7 +265,7 @@ def process_openhtf_attachments( max_attachments: int, max_file_size: int, needs_base64_decode: bool = True, - verify: str | None = None, + verify = None, #: str | None = None, ) -> None: """ Process attachments from an OpenHTF test record and upload them. diff --git a/tofupilot/utils/network.py b/tofupilot/utils/network.py index 67378a4..3b0669c 100644 --- a/tofupilot/utils/network.py +++ b/tofupilot/utils/network.py @@ -20,7 +20,7 @@ def api_request( data: Optional[Dict] = None, params: Optional[Dict] = None, timeout: int = SECONDS_BEFORE_TIMEOUT, - verify: str | None = None, + verify = None, #: str | None = None, ) -> Dict: """Unified API request handler with consistent error handling""" try: From f4f57f4b469d02b9301f656589af9667823be3b6 Mon Sep 17 00:00:00 2001 From: Quentin Bernet Date: Thu, 15 May 2025 12:35:48 +0200 Subject: [PATCH 5/5] fix: operator UI disconnectig permanently when internet connection lost --- tofupilot/client.py | 16 ++++--- tofupilot/openhtf/tofupilot.py | 85 ++++++++++++++++------------------ 2 files changed, 48 insertions(+), 53 deletions(-) diff --git a/tofupilot/client.py b/tofupilot/client.py index 51068be..df37e05 100644 --- a/tofupilot/client.py +++ b/tofupilot/client.py @@ -473,8 +473,12 @@ def get_connection_credentials(self) -> dict: Fetches credentials required to livestream test results. Returns: - values: - a dict containing the emqx server url, the topic to connect to, and the JWT token required to connect + a dict containing + "success": + a bool indicating success + "values" if success: + a dict containing the emqx server url, the topic to connect to, and the JWT token required to connect + other fields as set in handle_http_error and handle_network_error """ try: response = requests.get( @@ -485,13 +489,11 @@ def get_connection_credentials(self) -> dict: ) response.raise_for_status() values = handle_response(self._logger, response) - return values + return {"success": True, "values": values} except requests.exceptions.HTTPError as http_err: - handle_http_error(self._logger, http_err) - return None + return handle_http_error(self._logger, http_err) except requests.RequestException as e: - handle_network_error(self._logger, e) - return None + return handle_network_error(self._logger, e) def print_version_banner(current_version: str): diff --git a/tofupilot/openhtf/tofupilot.py b/tofupilot/openhtf/tofupilot.py index 65504a4..c6b310d 100644 --- a/tofupilot/openhtf/tofupilot.py +++ b/tofupilot/openhtf/tofupilot.py @@ -122,7 +122,6 @@ def __init__( self.update_task = None self.mqttClient = None self.publishOptions = None - self.failed_connection_attempts = 0 self._logger = self.client._logger self._streaming_setup_thread = None @@ -204,29 +203,37 @@ def _setup_streaming_with_state(self): self._setup_streaming() self.connection_completed = True - def _connect_streaming(self) -> str: - - def display_help_disable_streaming(): - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use TofuPilot(..., stream=False) in your script{reset}" - ) - - try: - cred = self.client.get_connection_credentials() - except Exception as e: - self._logger.warning(f"Operator UI: JWT error: {e}") - display_help_disable_streaming() - self.stream = False # Disable streaming on auth failure - return "" + def _display_help_disable_streaming(self): + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use TofuPilot(..., stream=False) in your script{reset}" + ) - if not cred: - self._logger.warning("Operator UI: Auth server connection failed") - display_help_disable_streaming() - self.stream = False # Disable streaming on auth failure - return "" + def _connect_streaming(self) -> str: + + res = {"success": False} + while not res.get("success", False): + try: + res = self.client.get_connection_credentials() + except Exception as e: + self._logger.warning(f"Operator UI: JWT error: {e}") + self._display_help_disable_streaming() + time.sleep(1) + + if not res.get("success", False): + status_code = res.get("status_code", 0) + self._logger.warning("Operator UI: Auth server connection failed") + self._display_help_disable_streaming() + + # various flavours of bad request/unauthorized + # We shouldn't retry to connect since it will fail again + if(400 <= status_code <= 407): + return "" + time.sleep(1) + + cred = res["values"] # Since we control the server, we know these will be set token = cred["token"] @@ -252,7 +259,6 @@ def display_help_disable_streaming(): self.mqttClient.username_pw_set("pythonClient", token) self.mqttClient.on_message = self._on_message - self.mqttClient.on_connect = self._on_connect self.mqttClient.on_disconnect = self._on_disconnect self.mqttClient.on_unsubscribe = self._on_unsubscribe @@ -262,7 +268,7 @@ def display_help_disable_streaming(): self._logger.warning( f"Operator UI: Failed to connect with server (exception): {e}" ) - display_help_disable_streaming() + self._display_help_disable_streaming() self.stream = False # Disable streaming on connection failure return "" @@ -270,7 +276,7 @@ def display_help_disable_streaming(): self._logger.warning( f"Operator UI: Failed to connect with server (error code): {connect_error_code}" ) - display_help_disable_streaming() + self._display_help_disable_streaming() self.stream = False # Disable streaming on connection failure return "" @@ -282,7 +288,7 @@ def display_help_disable_streaming(): self._logger.warning( f"Operator UI: Failed to subscribe to server (exception): {e}" ) - display_help_disable_streaming() + self._display_help_disable_streaming() self.stream = False # Disable streaming on subscription failure return "" @@ -290,7 +296,7 @@ def display_help_disable_streaming(): self._logger.warning( f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" ) - display_help_disable_streaming() + self._display_help_disable_streaming() self.stream = False # Disable streaming on subscription failure return "" @@ -332,9 +338,7 @@ def _setup_streaming(self): except Exception as e: self._logger.warning(f"Operator UI: Setup error - {e}") - print( - "To disable Operator UI streaming, use Test(..., stream=False) in your script" - ) + self._display_help_disable_streaming() self.stream = False # Disable streaming on any setup error def _send_update(self, message): @@ -420,26 +424,15 @@ def _on_message(self, client, userdata, message): if parsed["source"] == "web": self._handle_answer(**parsed["message"]) - def _on_connect( - self, client, userdata, connect_flags, reason_code, properties - ): - self.failed_connection_attempts = 0 - def _on_disconnect( self, client, userdata, disconnect_flags: mqtt.DisconnectFlags, reason_code: ReasonCode, properties ): if reason_code != mqtt.MQTT_ERR_SUCCESS or disconnect_flags.is_disconnect_packet_from_server: - # Exponential backoff - if self.failed_connection_attempts > 0: - time.sleep( - 2**self.failed_connection_attempts - ) - self.failed_connection_attempts += 1 - if self.failed_connection_attempts > 5: - self._logger.warning( - f"Operator UI: Unexpected disconnect (code {reason_code})" - ) + self._logger.warning( + f"Operator UI: Unexpected disconnect (code {reason_code})" + ) + self._connect_streaming() self.mqttClient.loop_start() test_state_dict, _ = _to_dict_with_event(self.test.state)