diff --git a/tofupilot/client.py b/tofupilot/client.py index d226f82..df37e05 100644 --- a/tofupilot/client.py +++ b/tofupilot/client.py @@ -224,12 +224,13 @@ def create_run_from_openhtf_report(self, file_path: str): https://www.tofupilot.com/docs/api#create-a-run-from-a-file """ # Upload report and create run from file_path - run_id = self.upload_and_create_from_openhtf_report(file_path) + upload_res = self.upload_and_create_from_openhtf_report(file_path) - # If run_id is not a string, it's an error response dictionary - if not isinstance(run_id, str): + if not upload_res.get("success", False): self._logger.error("OpenHTF import failed") - return run_id + return "" + + run_id = upload_res["run_id"] # Only continue with attachment upload if run_id is valid test_record = None @@ -396,13 +397,12 @@ def delete_unit(self, serial_number: str) -> dict: def upload_and_create_from_openhtf_report( self, file_path: str, - ) -> str: + ) -> Dict: """ Takes a path to an OpenHTF JSON file report, uploads it and creates a run from it. Returns: - str: - Id of the newly created run + Dict """ print("") @@ -449,8 +449,8 @@ def upload_and_create_from_openhtf_report( verify=self._verify, ) - # Return only the ID if successful, otherwise return the full result - if result.get("success", False) is not False: + # Return only the ID if successful, otherwise return the full result + if result.get("success", True) is not False: run_id = result.get("id") run_url = result.get("url") @@ -460,17 +460,25 @@ def upload_and_create_from_openhtf_report( elif run_id: self._logger.success(f"Run imported successfully with ID: {run_id}") - return run_id + return { + "success": True, + "run_id": run_id, + "upload_id": upload_id, + } else: - return result + return {**result, "upload_id": upload_id,} def get_connection_credentials(self) -> dict: """ Fetches credentials required to livestream test results. Returns: - values: - a dict containing the emqx server url, the topic to connect to, and the JWT token required to connect + a dict containing + "success": + a bool indicating success + "values" if success: + a dict containing the emqx server url, the topic to connect to, and the JWT token required to connect + other fields as set in handle_http_error and handle_network_error """ try: response = requests.get( @@ -481,13 +489,11 @@ def get_connection_credentials(self) -> dict: ) response.raise_for_status() values = handle_response(self._logger, response) - return values + return {"success": True, "values": values} except requests.exceptions.HTTPError as http_err: - handle_http_error(self._logger, http_err) - return None + return handle_http_error(self._logger, http_err) except requests.RequestException as e: - handle_network_error(self._logger, e) - return None + return handle_network_error(self._logger, e) def print_version_banner(current_version: str): diff --git a/tofupilot/openhtf/tofupilot.py b/tofupilot/openhtf/tofupilot.py index 3f6a051..c6b310d 100644 --- a/tofupilot/openhtf/tofupilot.py +++ b/tofupilot/openhtf/tofupilot.py @@ -1,3 +1,4 @@ +import time import types from typing import Optional from time import sleep @@ -9,6 +10,7 @@ import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion +from paho.mqtt.reasoncodes import ReasonCode from openhtf.core.test_record import TestRecord from openhtf.core.test_state import TestState @@ -123,11 +125,18 @@ def __init__( self._logger = self.client._logger self._streaming_setup_thread = None + def _upload(self, testRecord: TestRecord): + + # Side effecting ! + upload_id = upload(api_key=self.api_key, url=self.url, client=self.client)(testRecord) + + if (self.stream): + self._final_update(upload_id, testRecord) + def __enter__(self): # Add upload callback without pausing the logger yet self.test.add_output_callbacks( - upload(api_key=self.api_key, url=self.url, client=self.client), - self._final_update, + self._upload ) # Start streaming setup before pausing the logger @@ -194,116 +203,114 @@ def _setup_streaming_with_state(self): self._setup_streaming() self.connection_completed = True - def _setup_streaming(self): - try: + def _display_help_disable_streaming(self): + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use TofuPilot(..., stream=False) in your script{reset}" + ) + + def _connect_streaming(self) -> str: + + res = {"success": False} + while not res.get("success", False): try: - cred = self.client.get_connection_credentials() + res = self.client.get_connection_credentials() except Exception as e: self._logger.warning(f"Operator UI: JWT error: {e}") - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) - self.stream = False # Disable streaming on auth failure - return + self._display_help_disable_streaming() + time.sleep(1) - if not cred: + if not res.get("success", False): + status_code = res.get("status_code", 0) self._logger.warning("Operator UI: Auth server connection failed") - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) - self.stream = False # Disable streaming on auth failure - return self - - # Since we control the server, we know these will be set - token = cred["token"] - operatorPage = cred["operatorPage"] - clientOptions = cred["clientOptions"] - willOptions = cred["willOptions"] - connectOptions = cred["connectOptions"] - self.publishOptions = cred["publishOptions"] - subscribeOptions = cred["subscribeOptions"] - - self.mqttClient = mqtt.Client( - callback_api_version=CallbackAPIVersion.VERSION2, **clientOptions - ) + self._display_help_disable_streaming() + + # various flavours of bad request/unauthorized + # We shouldn't retry to connect since it will fail again + if(400 <= status_code <= 407): + return "" + time.sleep(1) + + cred = res["values"] + + # Since we control the server, we know these will be set + token = cred["token"] + operatorPage = cred["operatorPage"] + clientOptions = cred["clientOptions"] + willOptions = cred["willOptions"] + connectOptions = cred["connectOptions"] + self.publishOptions = cred["publishOptions"] + subscribeOptions = cred["subscribeOptions"] + + self.mqttClient = mqtt.Client( + callback_api_version=CallbackAPIVersion.VERSION2, **clientOptions + ) - # This is not 100% reliable, hence the need to put the setup in the background - # See https://github.com/eclipse-paho/paho.mqtt.python/issues/890 - self.mqttClient.connect_timeout = 1.0 + # This is not 100% reliable, hence the need to put the setup in the background + # See https://github.com/eclipse-paho/paho.mqtt.python/issues/890 + self.mqttClient.connect_timeout = 1.0 - self.mqttClient.tls_set() + self.mqttClient.tls_set() - self.mqttClient.will_set(**willOptions) + self.mqttClient.will_set(**willOptions) - self.mqttClient.username_pw_set("pythonClient", token) + self.mqttClient.username_pw_set("pythonClient", token) - self.mqttClient.on_message = self._on_message - self.mqttClient.on_disconnect = self._on_disconnect - self.mqttClient.on_unsubscribe = self._on_unsubscribe + self.mqttClient.on_message = self._on_message + self.mqttClient.on_disconnect = self._on_disconnect + self.mqttClient.on_unsubscribe = self._on_unsubscribe - try: - connect_error_code = self.mqttClient.connect(**connectOptions) - except Exception as e: - self._logger.warning( - f"Operator UI: Failed to connect with server (exception): {e}" - ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) - self.stream = False # Disable streaming on connection failure - return + try: + connect_error_code = self.mqttClient.connect(**connectOptions) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to connect with server (exception): {e}" + ) + self._display_help_disable_streaming() + self.stream = False # Disable streaming on connection failure + return "" - if connect_error_code != mqtt.MQTT_ERR_SUCCESS: - self._logger.warning( - f"Operator UI: Failed to connect with server (error code): {connect_error_code}" - ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) - self.stream = False # Disable streaming on connection failure - return self + if connect_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to connect with server (error code): {connect_error_code}" + ) + self._display_help_disable_streaming() + self.stream = False # Disable streaming on connection failure + return "" - try: - subscribe_error_code, messageId = self.mqttClient.subscribe( - **subscribeOptions - ) - except Exception as e: - self._logger.warning( - f"Operator UI: Failed to subscribe to server (exception): {e}" - ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) - self.stream = False # Disable streaming on subscription failure - return + try: + subscribe_error_code, messageId = self.mqttClient.subscribe( + **subscribeOptions + ) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (exception): {e}" + ) + self._display_help_disable_streaming() + self.stream = False # Disable streaming on subscription failure + return "" - if subscribe_error_code != mqtt.MQTT_ERR_SUCCESS: - self._logger.warning( - f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" - ) - # Print with yellow color for consistency with warnings - yellow = "\033[0;33m" - reset = "\033[0m" - print( - f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" - ) - self.stream = False # Disable streaming on subscription failure - return self + if subscribe_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" + ) + self._display_help_disable_streaming() + self.stream = False # Disable streaming on subscription failure + return "" + + return operatorPage + + + def _setup_streaming(self): + try: + + operator_page = self._connect_streaming() + + # We will already have displayed the error message + if operator_page == "": + return self.mqttClient.loop_start() @@ -318,7 +325,7 @@ def _setup_streaming(self): # Create clickable URL clickable_url = ( - f"\033]8;;{operatorPage}\033\\{operatorPage}\033]8;;\033\\" + f"\033]8;;{operator_page}\033\\{operator_page}\033]8;;\033\\" ) # Print single line connection message with URL @@ -327,13 +334,11 @@ def _setup_streaming(self): except: # Fallback for terminals that don't support ANSI self._logger.success(f"Connected to TofuPilot real-time server") - self._logger.success(f"Access Operator UI: {operatorPage}") + self._logger.success(f"Access Operator UI: {operator_page}") except Exception as e: self._logger.warning(f"Operator UI: Setup error - {e}") - print( - "To disable Operator UI streaming, use Test(..., stream=False) in your script" - ) + self._display_help_disable_streaming() self.stream = False # Disable streaming on any setup error def _send_update(self, message): @@ -389,14 +394,14 @@ def _handle_answer(self, plug_name, method_name, args): f"Operator UI: Method call failed - {method_name}({', '.join(args)}) - {e}" ) - def _final_update(self, testRecord: TestRecord): + def _final_update(self, upload_id: str, testRecord: TestRecord): """ If the test is fast enough, the watcher never triggers, to avoid the UI being out of sync, we force send at least once at the very end of the test """ - # Skip if streaming is disabled or MQTT client doesn't exist - if not self.stream or self.mqttClient is None: + # Skip if MQTT client doesn't exist + if self.mqttClient is None: return test_record_dict = testRecord.as_base_types() @@ -406,6 +411,7 @@ def _final_update(self, testRecord: TestRecord): "test_record": test_record_dict, "plugs": {"plug_states": {}}, "running_phase_state": {}, + "upload_id": upload_id } self._send_update(test_state_dict) @@ -419,12 +425,18 @@ def _on_message(self, client, userdata, message): self._handle_answer(**parsed["message"]) def _on_disconnect( - self, client, userdata, disconnect_flags, reason_code, properties + self, client, userdata, disconnect_flags: mqtt.DisconnectFlags, reason_code: ReasonCode, properties ): - if reason_code != mqtt.MQTT_ERR_SUCCESS: + if reason_code != mqtt.MQTT_ERR_SUCCESS or disconnect_flags.is_disconnect_packet_from_server: + self._logger.warning( f"Operator UI: Unexpected disconnect (code {reason_code})" ) + + self._connect_streaming() + self.mqttClient.loop_start() + test_state_dict, _ = _to_dict_with_event(self.test.state) + self._send_update(test_state_dict) def _on_unsubscribe(self, client, userdata, mid, reason_code_list, properties): if any( diff --git a/tofupilot/openhtf/upload.py b/tofupilot/openhtf/upload.py index c3e5f5b..7137ff0 100644 --- a/tofupilot/openhtf/upload.py +++ b/tofupilot/openhtf/upload.py @@ -68,7 +68,13 @@ def __init__( self._max_attachments = self.client._max_attachments self._max_file_size = self.client._max_file_size - def __call__(self, test_record: TestRecord): + def __call__(self, test_record: TestRecord) -> str: + """ + Returns: + str: + Id of the initial upload + (This id is present in the ApiCall table of the database) + """ # Resume logger to ensure it's active during attachment processing was_logger_resumed = False if hasattr(self._logger, "resume"): @@ -115,27 +121,16 @@ def __call__(self, test_record: TestRecord): # Extract run_id from response - it could be a string (id) or a dict (result with id field) run_id = None - if isinstance(result, dict): - # It's a dictionary response - if not result.get("success", True): - self._logger.error("Run creation failed, skipping attachments") - return - - # Try to get the ID from the dictionary - run_id = result.get("id") - else: - # Direct ID string - run_id = result - - # Final validation of run_id - if not run_id or not isinstance(run_id, str): - self._logger.error( - f"Invalid run ID received: {run_id}, skipping attachments" - ) - return + if not result.get("success", False): + self._logger.error("Run creation failed, skipping attachments") + return result.get("upload_id", "") + + run_id = result.get("run_id") + original_upload_id: str = result.get("upload_id") + except Exception as e: self._logger.error(f"Error creating run: {str(e)}") - return + return "" finally: # Ensure the file is deleted after processing if os.path.exists(filename): @@ -239,7 +234,9 @@ def __call__(self, test_record: TestRecord): f"Failed to process attachment: {str(e)}" ) continue - finally: - # For attachment logs to be visible, we intentionally don't pause the logger here - # Instead, we'll let the TofuPilot class's __exit__ method handle the logger state - pass + return original_upload_id + except Exception as e: + self._logger.error( + f"Otherwise uncaught exception: {str(e)}" + ) + return "" diff --git a/tofupilot/utils/files.py b/tofupilot/utils/files.py index 26cf4a1..f569a15 100644 --- a/tofupilot/utils/files.py +++ b/tofupilot/utils/files.py @@ -102,9 +102,26 @@ def upload_file( def notify_server( - headers: dict, url: str, upload_id: str, run_id: str, logger=None, verify: str | None = None + headers: dict, + url: str, + upload_id: str, + run_id: str, + logger = None, + verify = None, # str | None ) -> bool: - """Tells TP server to sync upload with newly created run""" + """Tells TP server to sync upload with newly created run + + Args: + headers (dict): Request headers including authorization + url (str): Base API URL + upload_id (str): ID of the upload to link + run_id (str): ID of the run to link to + logger (Optional[Logger]): The logger to use + verify (Optional[str]): Path to a CA bundle file to verify the server certificate + + Returns: + bool: True if successful + """ sync_url = f"{url}/uploads/sync" sync_payload = {"upload_id": upload_id, "run_id": run_id} @@ -135,7 +152,7 @@ def upload_attachment_data( data, mimetype: str, run_id: str, - verify: str | None, + verify, #: str | None, ) -> bool: """ Uploads binary data as an attachment and links it to a run @@ -184,40 +201,6 @@ def upload_attachment_data( logger.error(f"Upload failed: {name} - {str(e)}") return False - -def notify_server( - headers: dict, - url: str, - upload_id: str, - run_id: str, - verify: Optional[str] = None, -) -> bool: - """Tells TP server to sync upload with newly created run - - Args: - headers (dict): Request headers including authorization - url (str): Base API URL - upload_id (str): ID of the upload to link - run_id (str): ID of the run to link to - verify (Optional[str]): Path to a CA bundle file to verify the server certificate - - Returns: - bool: True if successful - """ - sync_url = f"{url}/uploads/sync" - sync_payload = {"upload_id": upload_id, "run_id": run_id} - - response = requests.post( - sync_url, - data=json.dumps(sync_payload), - verify=verify, - headers=headers, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - return response.status_code == 200 - - def upload_attachments( logger: Logger, headers: dict, @@ -282,7 +265,7 @@ def process_openhtf_attachments( max_attachments: int, max_file_size: int, needs_base64_decode: bool = True, - verify: str | None = None, + verify = None, #: str | None = None, ) -> None: """ Process attachments from an OpenHTF test record and upload them. diff --git a/tofupilot/utils/network.py b/tofupilot/utils/network.py index 67378a4..3b0669c 100644 --- a/tofupilot/utils/network.py +++ b/tofupilot/utils/network.py @@ -20,7 +20,7 @@ def api_request( data: Optional[Dict] = None, params: Optional[Dict] = None, timeout: int = SECONDS_BEFORE_TIMEOUT, - verify: str | None = None, + verify = None, #: str | None = None, ) -> Dict: """Unified API request handler with consistent error handling""" try: