diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1227cad..5bd9002 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -92,21 +92,22 @@ When the release PR is merged, and a new release is detected by the “Python pu If you need to test a new version of the Python client before making an official release, you can publish it to TestPyPI, a sandbox version of PyPI used for testing package distributions. -1. Build the package locally using: +1. If a previous test package with the exact same version was released, update the version in `setup.py`. For instance, change version="X.Y.Z.dev0" to version="X.Y.Z.dev1". +2. Build the package locally using: ```sh rm -rf dist/* python -m build ``` This will generate distribution files in the dist/ directory. -2. If a previous test package with the exact same version was released, update the version in `setup.py`. For instance, change version="X.Y.Z.dev0" to version="X.Y.Z.dev1". +3. Get a testpypi API key, for example ask your managment if they have one. Then run: ```sh - twine upload --repository testpypi dist/ + twine upload --repository testpypi dist/* ``` -3. To install the new test package, run: +4. To install the new test package, run: ```sh pip install -i https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ tofupilot== ``` diff --git a/data/performance-report.pdf b/data/performance-report.pdf deleted file mode 100644 index ce6914b..0000000 Binary files a/data/performance-report.pdf and /dev/null differ diff --git a/data/temperature-map.png b/data/temperature-map.png deleted file mode 100644 index c7eb24b..0000000 Binary files a/data/temperature-map.png and /dev/null differ diff --git a/requirements.txt b/requirements.txt index 37b92e6..b4e5b37 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,8 @@ requests setuptools packaging pytest -websockets \ No newline at end of file +websockets +paho-mqtt +build +twine +certifi>=2023.7.22 \ No newline at end of file diff --git a/setup.py b/setup.py index 22735a1..f5fb9e4 100644 --- a/setup.py +++ b/setup.py @@ -5,25 +5,38 @@ setup( name="tofupilot", - version="1.10.0", + version="1.11.1", packages=find_packages(), - install_requires=["requests", "setuptools", "packaging", "pytest", "websockets"], + install_requires=[ + "requests>=2.25.0", + "setuptools>=50.0.0", + "packaging>=20.0", + "paho-mqtt>=2.0.0", + "sentry-sdk>=1.0.0", + "certifi>=2020.12.5", + ], entry_points={ "pytest11": [ "tofupilot = tofupilot.plugin", # Registering the pytest plugin ], }, - author="TofuPilot", + author="TofuPilot Team", author_email="hello@tofupilot.com", - description="The official Python client for the TofuPilot API", + description="Official Python client for TofuPilot with OpenHTF integration, real-time streaming and file attachment support", license="MIT", - keywords="automatic hardware testing tofupilot", + keywords="automatic hardware testing tofupilot openhtf", long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/tofupilot/python-client", classifiers=[ "Programming Language :: Python :: 3", "Operating System :: OS Independent", + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Intended Audience :: Manufacturing", + "Topic :: Scientific/Engineering", + "Topic :: Software Development :: Testing", + "Topic :: Software Development :: Libraries :: Python Modules", ], python_requires=">=3.9", ) diff --git a/tofupilot/client.py b/tofupilot/client.py index 7aee334..78a69be 100644 --- a/tofupilot/client.py +++ b/tofupilot/client.py @@ -8,8 +8,8 @@ from importlib.metadata import version import json -import base64 import requests +import certifi from .constants import ( ENDPOINT, @@ -29,7 +29,8 @@ handle_response, handle_http_error, handle_network_error, - notify_server, + api_request, + process_openhtf_attachments, ) @@ -55,9 +56,12 @@ def __init__( print_version_banner(self._current_version) self._logger = setup_logger(logging.INFO) + # Configure SSL certificate validation + self._setup_ssl_certificates() + self._api_key = api_key or os.environ.get("TOFUPILOT_API_KEY") if self._api_key is None: - error = "Please set TOFUPILOT_API_KEY environment variable. For more information on how to find or generate a valid API key, visit https://tofupilot.com/docs/user-management#api-key." # pylint: disable=line-too-long + error = "Please set TOFUPILOT_API_KEY environment variable. For more information on how to find or generate a valid API key, visit https://tofupilot.com/docs/user-management#api-key." self._logger.error(error) sys.exit(1) @@ -71,10 +75,21 @@ def __init__( self._max_file_size = FILE_MAX_SIZE check_latest_version(self._logger, self._current_version, "tofupilot") + def _setup_ssl_certificates(self): + """Configure SSL certificate validation using certifi if needed.""" + # Check if SSL_CERT_FILE is already set to a valid path + cert_file = os.environ.get("SSL_CERT_FILE") + if not cert_file or not os.path.isfile(cert_file): + # Use certifi's certificate bundle + certifi_path = certifi.where() + if os.path.isfile(certifi_path): + os.environ["SSL_CERT_FILE"] = certifi_path + self._logger.debug(f"SSL: Using certifi path {certifi_path}") + def _log_request(self, method: str, endpoint: str, payload: Optional[dict] = None): """Logs the details of the HTTP request.""" self._logger.debug( - "%s %s%s with payload: %s", method, self._url, endpoint, payload + "Request: %s %s%s payload=%s", method, self._url, endpoint, payload ) def create_run( # pylint: disable=too-many-arguments,too-many-locals @@ -127,7 +142,8 @@ def create_run( # pylint: disable=too-many-arguments,too-many-locals References: https://www.tofupilot.com/docs/api#create-a-run """ - self._logger.info("Starting run creation...") + print("") + self._logger.info("Creating run...") if attachments is not None: validate_files( @@ -171,35 +187,22 @@ def create_run( # pylint: disable=too-many-arguments,too-many-locals payload["report_variables"] = report_variables self._log_request("POST", "/runs", payload) + result = api_request( + self._logger, "POST", f"{self._url}/runs", self._headers, data=payload + ) - try: - response = requests.post( - f"{self._url}/runs", - json=payload, - headers=self._headers, - timeout=SECONDS_BEFORE_TIMEOUT, - verify=self._verify, + # Upload attachments if run was created successfully + run_id = result.get("id") + if run_id and attachments: + # Ensure logger is active for attachment uploads + if hasattr(self._logger, 'resume'): + self._logger.resume() + + upload_attachments( + self._logger, self._headers, self._url, attachments, run_id ) - response.raise_for_status() - result = handle_response(self._logger, response) - - run_id = result.get("id") - if run_id and attachments: - upload_attachments( - self._logger, - self._headers, - self._url, - attachments, - run_id, - self._verify, - ) - return result - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return result def create_run_from_openhtf_report(self, file_path: str): """ @@ -219,74 +222,53 @@ def create_run_from_openhtf_report(self, file_path: str): # Upload report and create run from file_path run_id = self.upload_and_create_from_openhtf_report(file_path) + # If run_id is not a string, it's an error response dictionary + if not isinstance(run_id, str): + self._logger.error("OpenHTF import failed") + return run_id + + # Only continue with attachment upload if run_id is valid + test_record = None try: with open(file_path, "r", encoding="utf-8") as file: test_record = json.load(file) except FileNotFoundError: - print(f"Error: The file '{file_path}' was not found.") + self._logger.error(f"File not found: {file_path}") + return run_id except json.JSONDecodeError: - print(f"Error: The file '{file_path}' contains invalid JSON.") + self._logger.error(f"Invalid JSON: {file_path}") + return run_id except PermissionError: - print(f"Error: Insufficient permissions to read '{file_path}'.") + self._logger.error(f"Permission denied: {file_path}") + return run_id except Exception as e: - print(f"Unexpected error: {e}") - - if run_id and test_record: - number_of_attachments = 0 - for phase in test_record.get("phases"): - # Keep only max number of attachments - if number_of_attachments >= self._max_attachments: - self._logger.warning( - "Too many attachments, trimming to %d attachments.", - self._max_attachments, - ) - break - for attachment_name, attachment in phase.get("attachments").items(): - number_of_attachments += 1 - - self._logger.info("Uploading %s...", attachment_name) - - # Upload initialization - initialize_url = f"{self._url}/uploads/initialize" - payload = {"name": attachment_name} - - response = requests.post( - initialize_url, - data=json.dumps(payload), - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - response.raise_for_status() - response_json = response.json() - upload_url = response_json.get("uploadUrl") - upload_id = response_json.get("id") - - data = base64.b64decode(attachment["data"]) - - requests.put( - upload_url, - data=data, - headers={ - "Content-Type": attachment["mimetype"] - or "application/octet-stream", # Default to binary if mimetype is missing - }, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - notify_server( - self._headers, - self._url, - upload_id, - run_id, - self._verify, - ) - - self._logger.success( - "Attachment %s successfully uploaded and linked to run.", - attachment_name, - ) + self._logger.error(f"Error: {e}") + return run_id + + # Now safely proceed with attachment upload + if run_id and test_record and "phases" in test_record: + # Add a visual separator after the run success message + print("") + self._logger.info("Processing attachments from OpenHTF test record") + + # Use the centralized function to process all attachments + process_openhtf_attachments( + self._logger, + self._headers, + self._url, + test_record, + run_id, + self._max_attachments, + self._max_file_size, + needs_base64_decode=True, # JSON attachments need base64 decoding + ) + else: + if not test_record: + self._logger.error("Test record load failed") + elif "phases" not in test_record: + self._logger.error("No phases in test record") + + return run_id def get_runs(self, serial_number: str) -> dict: """ @@ -314,28 +296,12 @@ def get_runs(self, serial_number: str) -> dict: "error": {"message": error_message}, } - self._logger.info( - "Fetching runs for unit with serial number %s...", serial_number - ) + self._logger.info("Fetching runs for: %s", serial_number) params = {"serial_number": serial_number} - self._log_request("GET", "/runs", params) - - try: - response = requests.get( - f"{self._url}/runs", - headers=self._headers, - verify=self._verify, - params=params, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, "GET", f"{self._url}/runs", self._headers, params=params + ) def delete_run(self, run_id: str) -> dict: """ @@ -351,24 +317,11 @@ def delete_run(self, run_id: str) -> dict: References: https://www.tofupilot.com/docs/api#delete-a-run """ - self._logger.info('Starting deletion of run "%s"...', run_id) - + self._logger.info("Deleting run: %s", run_id) self._log_request("DELETE", f"/runs/{run_id}") - - try: - response = requests.delete( - f"{self._url}/runs/{run_id}", - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, "DELETE", f"{self._url}/runs/{run_id}", self._headers + ) def update_unit( self, serial_number: str, sub_units: Optional[List[SubUnit]] = None @@ -389,27 +342,16 @@ def update_unit( References: https://www.tofupilot.com/docs/api#update-a-unit """ - self._logger.info('Starting update of unit "%s"...', serial_number) - + self._logger.info("Updating unit: %s", serial_number) payload = {"sub_units": sub_units} - self._log_request("PATCH", f"/units/{serial_number}", payload) - - try: - response = requests.patch( - f"{self._url}/units/{serial_number}", - json=payload, - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, + "PATCH", + f"{self._url}/units/{serial_number}", + self._headers, + data=payload, + ) def delete_unit(self, serial_number: str) -> dict: """ @@ -426,24 +368,11 @@ def delete_unit(self, serial_number: str) -> dict: References: https://www.tofupilot.com/docs/api#delete-a-unit """ - self._logger.info('Starting deletion of unit "%s"...', serial_number) - + self._logger.info("Deleting unit: %s", serial_number) self._log_request("DELETE", f"/units/{serial_number}") - - try: - response = requests.delete( - f"{self._url}/units/{serial_number}", - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, "DELETE", f"{self._url}/units/{serial_number}", self._headers + ) def upload_and_create_from_openhtf_report( self, @@ -457,7 +386,8 @@ def upload_and_create_from_openhtf_report( Id of the newly created run """ - self._logger.info("Starting run creation...") + print("") + self._logger.info("Importing run...") # Validate report validate_files( @@ -470,9 +400,13 @@ def upload_and_create_from_openhtf_report( self._headers, self._url, file_path, self._verify ) except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) + error_info = handle_http_error(self._logger, http_err) + # Error already logged by handle_http_error + return error_info except requests.RequestException as e: - return handle_network_error(self._logger, e) + error_info = handle_network_error(self._logger, e) + # Error already logged by handle_network_error + return error_info payload = { "upload_id": upload_id, @@ -483,57 +417,63 @@ def upload_and_create_from_openhtf_report( self._log_request("POST", "/import", payload) - # Create run from file - try: - response = requests.post( - f"{self._url}/import", - json=payload, - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - result = handle_response(self._logger, response) + # Create run from file using unified API request handler + result = api_request( + self._logger, "POST", f"{self._url}/import", self._headers, data=payload + ) + # Return only the ID if successful, otherwise return the full result + if result.get("success", False) is not False: run_id = result.get("id") + run_url = result.get("url") - return run_id + # Explicitly log success with URL if available + if run_url: + self._logger.success(f"Run imported successfully: {run_url}") + elif run_id: + self._logger.success(f"Run imported successfully with ID: {run_id}") - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return run_id + else: + return result - def get_websocket_url(self) -> dict: + def get_connection_credentials(self) -> dict: """ - Fetches websocket connection url associated with API Key. + Fetches credentials required to livestream test results. Returns: - str: - Websocket connection URL. + values: + a dict containing the emqx server url, the topic to connect to, and the JWT token required to connect """ - try: response = requests.get( - f"{self._url}/rooms", + f"{self._url}/streaming", headers=self._headers, verify=self._verify, timeout=SECONDS_BEFORE_TIMEOUT, ) response.raise_for_status() values = handle_response(self._logger, response) - url = values.get("url") - return url - + return values except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) + handle_http_error(self._logger, http_err) + return None except requests.RequestException as e: - return handle_network_error(self._logger, e) + handle_network_error(self._logger, e) + return None def print_version_banner(current_version: str): - """Prints current version of client""" - banner = f""" - TofuPilot Python Client {current_version} - """ - print(banner.strip()) + """Prints current version of client with tofu art""" + # Colors for the tofu art + yellow = "\033[33m" # Yellow for the plane + blue = "\033[34m" # Blue for the cap border + reset = "\033[0m" # Reset color + + banner = ( + f"{blue}╭{reset} {yellow}✈{reset} {blue}╮{reset}\n" + f"[•ᴗ•] TofuPilot Python Client {current_version}\n" + "\n" + ) + + print(banner, end="") diff --git a/tofupilot/openhtf/README.md b/tofupilot/openhtf/README.md new file mode 100644 index 0000000..d56dad5 --- /dev/null +++ b/tofupilot/openhtf/README.md @@ -0,0 +1,116 @@ +# TofuPilot OpenHTF Integration + +This module provides integration between OpenHTF and TofuPilot, enhancing the testing experience with features like: + +- Automatic uploading of test results to TofuPilot +- Real-time streaming of test execution +- Enhanced user prompts with TofuPilot operator URLs displayed in the console +- Graceful error handling for test interruptions + +## Quick Start + +```python +from openhtf import Test +from tofupilot.openhtf import TofuPilot, execute_with_graceful_exit + +def main(): + test = Test(*your_phases, procedure_id="FVT1") + + # Stream real-time test execution data to TofuPilot + with TofuPilot(test): + # Use helper function for graceful Ctrl+C handling + result = execute_with_graceful_exit(test, test_start=lambda: "SN15") + + if result is None: + print("Test was interrupted. Exiting gracefully.") + else: + print(f"Test completed with outcome: {result.outcome.name}") +``` + +## Enhanced Prompt Functionality + +TofuPilot enhances OpenHTF's prompt system by displaying the TofuPilot URL before each prompt. + +### URL Display + +The TofuPilot URL is displayed clearly in the console before each prompt: +``` +📱 View live test results: https://tofupilot.example.com/test/123 +Enter a DUT ID in order to start the test. +``` + +This URL display is kept separate from the actual prompt text to maintain clean prompts in the web UI. + +### Using Enhanced Prompts + +There are two main ways to use the enhanced prompts: + +1. **Use the provided prompt functions**: + ```python + from tofupilot.openhtf import prompt_with_tofupilot_url + + response = prompt_with_tofupilot_url( + "Enter calibration value:", + operator_page_url="https://tofupilot.example.com/test/123" + ) + ``` + +2. **Use the `patch_openhtf_prompts` function** to enhance all OpenHTF prompts: + ```python + from tofupilot.openhtf import patch_openhtf_prompts + + # Call this early in your application + patch_openhtf_prompts(tofupilot_url="https://tofupilot.example.com/test/123") + ``` + +## Graceful Error Handling + +TofuPilot provides a helper function to gracefully handle interruptions during test execution. + +### Using execute_with_graceful_exit + +```python +from tofupilot.openhtf import execute_with_graceful_exit + +# Inside your with TofuPilot(test) block: +result = execute_with_graceful_exit(test, test_start=your_test_start_fn) + +# Only show success message if test wasn't interrupted +if result is not None: + print(f"Test completed with outcome: {result.outcome.name}") +``` + +This helper: +- Shows immediate feedback when Ctrl+C is pressed +- Displays "Test execution interrupted by user. Test was interrupted. Exiting gracefully." +- Properly handles KeyboardInterrupt exceptions +- Returns None if the test was interrupted +- Ensures clean resource release +- Prevents stack traces from appearing when the user presses Ctrl+C + +## OpenHTF Output Callbacks + +By default, the TofuPilot context manager automatically adds an output callback to upload test results to TofuPilot upon test completion: + +```python +with TofuPilot(test): + # This will automatically upload test results when complete + test.execute(test_start=lambda: "SN15") +``` + +If you want to manually add the callback: + +```python +from tofupilot.openhtf import upload + +test = Test(*your_phases) +test.add_output_callbacks(upload()) +test.execute(test_start=lambda: "SN15") +``` + +## Important Notes + +1. TofuPilot URL information is displayed in the console log, not in the prompt itself. +2. When using `execute_with_graceful_exit`, interrupted tests will return `None` instead of a test result. +3. The TofuPilot context manager handles automatic upload of test results. +4. For OpenHTF tests that are interrupted, the standard OpenHTF output callbacks will still run. \ No newline at end of file diff --git a/tofupilot/openhtf/__init__.py b/tofupilot/openhtf/__init__.py index 8f9ff42..446ace8 100644 --- a/tofupilot/openhtf/__init__.py +++ b/tofupilot/openhtf/__init__.py @@ -1,10 +1,27 @@ -""" -This module handles all TofuPilot methods related to integration with OpenHTF. +"""TofuPilot integration with OpenHTF. -It provides two main classes: -1. tofupilot.upload(): A way to interface with OpenHTF test scripts to automatically upload test results to the TofuPilot server. -2. tofupilot.TofuPilot(): A way to stream real-time execution data of OpenHTF tests to TofuPilot for live monitoring. +Core functionality: +1. upload(): Upload OpenHTF test results to TofuPilot +2. TofuPilot(): Stream real-time test execution data for monitoring +3. Enhanced prompts: Display interactive TofuPilot prompts in terminal + - Bold question text with [User Input] prefix + - Clickable TofuPilot URLs in the terminal + - Instructions for terminal and web UI input options + - Graceful Ctrl+C handling with result upload """ from .upload import upload from .tofupilot import TofuPilot +from .custom_prompt import ( + patch_openhtf_prompts, + prompt_with_tofupilot_url, + enhanced_prompt_for_test_start, +) + +__all__ = [ + 'TofuPilot', + 'upload', + 'patch_openhtf_prompts', + 'prompt_with_tofupilot_url', + 'enhanced_prompt_for_test_start', +] diff --git a/tofupilot/openhtf/custom_prompt.py b/tofupilot/openhtf/custom_prompt.py new file mode 100644 index 0000000..9e25b0c --- /dev/null +++ b/tofupilot/openhtf/custom_prompt.py @@ -0,0 +1,199 @@ +"""Custom OpenHTF prompt integration with TofuPilot. + +Enhances OpenHTF prompts with TofuPilot features: +- Clickable URLs in the console +- Bold question text +- Consistent visual formatting +- Compatible with TofuPilot web UI streaming + +Prompt format: +- [User Input] QUESTION TEXT (bold) +- Waiting for user input on TofuPilot Operator UI (clickable) or in terminal below. +- Press Ctrl+C to cancel and upload results. (muted) +- Standard OpenHTF prompt (-->) + +Note: The message appears twice in the console to ensure web UI compatibility. +""" + +import logging +from typing import Text, Optional, Union, Callable + +import openhtf +from openhtf import plugs +from openhtf.plugs.user_input import UserInput + +_LOG = logging.getLogger(__name__) + +def prompt_with_tofupilot_url( + message: Text, + operator_page_url: Optional[Text] = None, + text_input: bool = False, + timeout_s: Union[int, float, None] = None, + cli_color: Text = '', + image_url: Optional[Text] = None) -> Text: + """Enhanced prompt that displays TofuPilot URL in the console. + + Args: + message: A string to be presented to the user. + operator_page_url: URL to the TofuPilot operator page. + text_input: A boolean indicating whether the user must respond with text. + timeout_s: Seconds to wait before raising a PromptUnansweredError. + cli_color: An ANSI color code, or the empty string. + image_url: Optional image URL to display or None. + + Returns: + A string response, or the empty string if text_input was False. + """ + import sys + + # Get the UserInput plug instance + prompts = plugs.get_plug_instance(UserInput) + + # Print URL and instructions directly to console before the prompt + # This way they won't appear in the web UI + if operator_page_url: + # Create clickable URL if in terminal that supports it + try: + # Create clickable URL with ANSI escape sequences + # Make both "TofuPilot Operator UI" clickable + clickable_text = f"\033]8;;{operator_page_url}\033\\TofuPilot Operator UI\033]8;;\033\\" + sys.stdout.write(f"\n[User Input] \033[1m{message}\033[0m\n") + sys.stdout.write(f"Waiting for user input on {clickable_text} or in terminal below.\n") + # sys.stdout.write("\033[2mPress Ctrl+C to cancel and upload results.\033[0m\n\n") + sys.stdout.flush() + except: + # Fallback if terminal doesn't support ANSI sequences + print(f"\n[User Input] {message}") + print(f"Waiting for user input on TofuPilot Operator UI or in terminal below.") + # print("Press Ctrl+C to cancel and upload results.\n") + + # Store original message and use it for web UI compatibility + original_msg = message + + # Use dim/muted text for the prompt + if not cli_color: + cli_color = '\033[2m' + return prompts.prompt( + original_msg, + text_input=text_input, + timeout_s=timeout_s, + cli_color=cli_color, + image_url=image_url + ) + + +def enhanced_prompt_for_test_start( + operator_page_url: Optional[Text] = None, + message: Text = 'Enter a DUT ID in order to start the test.', + timeout_s: Union[int, float, None] = 60 * 60 * 24, + validator: Callable[[Text], Text] = lambda sn: sn) -> openhtf.PhaseDescriptor: + """Returns an OpenHTF phase that displays TofuPilot URL in the console. + + Args: + operator_page_url: URL to the TofuPilot operator page. + message: The message to display to the user. + timeout_s: Seconds to wait before raising a PromptUnansweredError. + validator: Function used to validate or modify the serial number. + cli_color: An ANSI color code, or the empty string. + """ + + @openhtf.PhaseOptions(timeout_s=timeout_s) + @plugs.plug(prompts=UserInput) + def trigger_phase(test: openhtf.TestApi, prompts: UserInput) -> None: + """Test start trigger with TofuPilot URL displayed in console.""" + import sys + + # Print URL and instructions directly to console before the prompt + # This way they won't appear in the web UI + if operator_page_url: + # Create clickable URL if in terminal that supports it + try: + # Create clickable URL with ANSI escape sequences + # Make both "TofuPilot Operator UI" clickable + clickable_text = f"\033]8;;{operator_page_url}\033\\TofuPilot Operator UI\033]8;;\033\\" + sys.stdout.write(f"\n[User Input] \033[1m{message}\033[0m\n") + sys.stdout.write(f"Waiting for user input on {clickable_text} or in terminal below.\n") + sys.stdout.write("\033[2mPress Ctrl+C to cancel and upload results.\033[0m\n\n") + sys.stdout.flush() + except: + # Fallback if terminal doesn't support ANSI sequences + print(f"\n[User Input] {message}") + print(f"Waiting for user input on TofuPilot Operator UI or in terminal below.") + # print("Press Ctrl+C to cancel and upload results.\n") + + # Store original message and use it for web UI compatibility + original_msg = message + + # Use dim/muted text for the prompt + if not cli_color: + cli_color = '\033[2m' + dut_id = prompts.prompt( + original_msg, + text_input=True, + timeout_s=timeout_s, + cli_color=cli_color + ) + + # Apply validator and set DUT ID + test.test_record.dut_id = validator(dut_id) + + return trigger_phase + + +# Monkey-patching function to include TofuPilot URL in prompts +original_prompt = UserInput.prompt + +def patched_prompt(self, message, text_input=False, timeout_s=None, cli_color='', image_url=None, + tofupilot_url=None): + """Patched prompt method that displays TofuPilot URL in console.""" + import sys + + # Store original message to use in web UI + original_msg = message + + # Print URL and instructions directly to console before the prompt + # This way they won't appear in the web UI + if tofupilot_url: + # Create clickable URL if in terminal that supports it + try: + # Create clickable URL with ANSI escape sequences + # Make both "TofuPilot Operator UI" clickable + clickable_text = f"\033]8;;{tofupilot_url}\033\\TofuPilot Operator UI\033]8;;\033\\" + sys.stdout.write(f"\n[User Input] \033[1m{message}\033[0m\n") + sys.stdout.write(f"Waiting for user input on {clickable_text} or in terminal below.\n") + # sys.stdout.write("\033[2mPress Ctrl+C to stop and upload run.\033[0m\n\n") + sys.stdout.flush() + + except: + # Fallback if terminal doesn't support ANSI sequences + print(f"\n[User Input] {message}") + print(f"Waiting for user input on TofuPilot Operator UI or in terminal below.") + # print("Press Ctrl+C to cancel and upload results.\n") + + # Override cli_color to make the OpenHTF prompt appear dimmed + # This works because the cli_color is applied to the prompt arrow + if not cli_color: + cli_color = '\033[2m' # Use dim/muted text if no color specified + + # Use original message for web UI compatibility + return original_prompt(self, original_msg, text_input, timeout_s, cli_color, image_url) + + +def patch_openhtf_prompts(tofupilot_url=None): + """Monkey-patch OpenHTF's UserInput class to display TofuPilot URL. + + This function should be called early in your application to ensure all prompts + show the TofuPilot URL in the console (not in the prompt text itself). + + Args: + tofupilot_url: URL to the TofuPilot operator page. + """ + if tofupilot_url: + # Store URL in UserInput class for access by all instances + UserInput.tofupilot_url = tofupilot_url + + # Monkey-patch the prompt method + UserInput.prompt = lambda self, message, text_input=False, timeout_s=None, cli_color='', image_url=None: \ + patched_prompt(self, message, text_input, timeout_s, cli_color, image_url, tofupilot_url) + else: + _LOG.debug("No TofuPilot URL provided for prompt enhancement") \ No newline at end of file diff --git a/tofupilot/openhtf/tofupilot.py b/tofupilot/openhtf/tofupilot.py index 649bf06..d75510f 100644 --- a/tofupilot/openhtf/tofupilot.py +++ b/tofupilot/openhtf/tofupilot.py @@ -1,15 +1,19 @@ +import types from typing import Optional from time import sleep import threading -import asyncio import json from openhtf import Test from openhtf.util import data -from websockets import connect, ConnectionClosedError, InvalidURI +import paho.mqtt.client as mqtt +from paho.mqtt.enums import CallbackAPIVersion +from openhtf.core.test_record import TestRecord +from openhtf.core.test_state import TestState from .upload import upload +from .custom_prompt import patch_openhtf_prompts from ..client import TofuPilotClient @@ -32,7 +36,7 @@ def _get_executing_test(): return test, test_state -def _to_dict_with_event(test_state): +def _to_dict_with_event(test_state: TestState): """Process a test state into the format we want to send to the frontend.""" original_dict, event = test_state.asdict_with_event() @@ -76,7 +80,7 @@ def stop(self): class TofuPilot: """ Context manager to automatically add an output callback to the running OpenHTF test - and live stream it's execution. + and live stream it's execution to the Operator UI. ### Usage Example: @@ -90,16 +94,20 @@ class TofuPilot: def main(): test = Test(*your_phases, procedure_id="FVT1") - # Stream real-time test execution data to TofuPilot + # Stream real-time test execution data to TofuPilot Operator UI with TofuPilot(test): - test.execute(lambda: "SN15") + # For more reliable Ctrl+C handling, use the helper function: + execute_with_graceful_exit(test, test_start=lambda: "SN15") + + # Or use the standard method (may show errors on Ctrl+C): + # test.execute(lambda: "SN15") ``` """ def __init__( self, test: Test, - stream: Optional[bool] = True, + stream: Optional[bool] = True, # Controls connection to Operator UI api_key: Optional[str] = None, url: Optional[str] = None, ): @@ -108,125 +116,324 @@ def __init__( self.client = TofuPilotClient(api_key=api_key, url=url) self.api_key = api_key self.url = url - self.loop = None - self.update_queue = None - self.event_loop_thread = None self.watcher = None self.shutdown_event = threading.Event() self.update_task = None + self.mqttClient = None + self.publishOptions = None + self._logger = self.client._logger + self._streaming_setup_thread = None def __enter__(self): - # Initialize a thread-safe asyncio.Queue + # Add upload callback without pausing the logger yet self.test.add_output_callbacks( - upload(api_key=self.api_key, url=self.url, client=self.client) + upload(api_key=self.api_key, url=self.url, client=self.client), + self._final_update, ) + # Start streaming setup before pausing the logger if self.stream: - self.update_queue = asyncio.Queue() + self.connection_completed = False - # Start the event loop in a separate thread - self.event_loop_thread = threading.Thread( - target=self.run_event_loop, daemon=True + # Start connection in a separate thread with 1s timeout + self._streaming_setup_thread = threading.Thread( + target=self._setup_streaming_with_state, daemon=True ) - self.event_loop_thread.start() - - # Wait until the event loop is ready - while self.loop is None: - sleep(0.1) - - # Start the SimpleStationWatcher with a callback to send updates - self.watcher = SimpleStationWatcher(self.send_update) - self.watcher.start() + self._streaming_setup_thread.start() + self._streaming_setup_thread.join(1.0) + # Pause logger after connection attempt is either completed or timed out + self._logger.pause() return self def __exit__(self, exc_type, exc_value, traceback): + """Clean up resources when exiting the context manager. + + This method handles proper cleanup even in the case of KeyboardInterrupt + or other exceptions to ensure resources are released properly. + """ + self._logger.resume() + + # Handle ongoing connection attempt + if self._streaming_setup_thread and self._streaming_setup_thread.is_alive(): + if ( + not hasattr(self, "connection_completed") + or not self.connection_completed + ): + self._logger.warning(f"Operator UI: Connection still in progress") + self._streaming_setup_thread.join(timeout=3.0) + if self._streaming_setup_thread.is_alive(): + self._logger.warning(f"Operator UI: Connection timed out") + # Stop the StationWatcher if self.watcher: - self.watcher.stop() - self.watcher.join() - - # Schedule the shutdown coroutine - if self.loop and not self.loop.is_closed(): - asyncio.run_coroutine_threadsafe(self.shutdown(), self.loop) - - # Wait for the event loop thread to finish - if self.event_loop_thread: - self.event_loop_thread.join() - - def send_update(self, message): - """Thread-safe method to send a message to the event loop.""" - if self.loop and not self.loop.is_closed(): - asyncio.run_coroutine_threadsafe(self.update_queue.put(message), self.loop) - - def run_event_loop(self): - """Runs the asyncio event loop in a separate thread.""" - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - self.loop.run_until_complete(self.setup()) - self.loop.run_forever() - - async def setup(self): - """Starts the update processor.""" - # Start the coroutine that processes updates - self.update_task = asyncio.create_task(self.process_updates()) - - async def process_updates(self): - """ - Sends the current state of the test to the WebSocket server. - """ + try: + self.watcher.stop() + self.watcher.join(timeout=2.0) # Add timeout to prevent hanging + except Exception as e: + self._logger.warning(f"Error stopping watcher: {e}") + + # Clean up MQTT connection + if self.mqttClient: + try: + # Doesn't wait for publish operation to stop, this is fine since __exit__ is only called after the run was imported + self.mqttClient.loop_stop() + self.mqttClient.disconnect() + except Exception as e: + self._logger.warning(f"Error disconnecting MQTT client: {e}") + finally: + self.mqttClient = None + + # Return False to allow any exception to propagate, unless it's a KeyboardInterrupt + # In case of KeyboardInterrupt, return True to suppress the exception + return exc_type is KeyboardInterrupt + + # Operator UI-related methods + + def _setup_streaming_with_state(self): + """Run the streaming setup and track connection completion state.""" + self._setup_streaming() + self.connection_completed = True + + def _setup_streaming(self): try: - url = self.client.get_websocket_url() - - if not url: - return # Exit gracefully if no URL is provided - - retry_count = 0 - max_retries = 5 - backoff_factor = 2 # Exponential backoff base - - while retry_count < max_retries and not self.shutdown_event.is_set(): - try: - async with connect(url) as websocket: - while not self.shutdown_event.is_set(): - try: - # Fetch state update from the queue (with timeout to avoid blocking indefinitely) - state_update = await asyncio.wait_for( - self.update_queue.get(), timeout=1.0 - ) - # Send the state update to the WebSocket server - await websocket.send( - json.dumps( - {"action": "send", "message": state_update} - ) - ) - except asyncio.TimeoutError: - continue # Timeout waiting for an update; loop back - except asyncio.CancelledError: - return # Exit cleanly on task cancellation - except Exception: # pylint: disable=broad-exception-caught - break # Exit WebSocket loop on unexpected errors - except (ConnectionClosedError, OSError, InvalidURI): - retry_count += 1 - await asyncio.sleep( - backoff_factor**retry_count - ) # Exponential backoff - except asyncio.CancelledError: - return # Exit cleanly on task cancellation - except Exception: # pylint: disable=broad-exception-caught - break # Exit gracefully on unexpected errors - except Exception: # pylint: disable=broad-exception-caught - pass # Catch all remaining exceptions to ensure robustness - - async def shutdown(self): - """Cleans up resources and stops the event loop.""" - # Cancel the update task - if self.update_task is not None: - self.update_task.cancel() try: - await self.update_task - except asyncio.CancelledError: - pass + cred = self.client.get_connection_credentials() + except Exception as e: + self._logger.warning(f"Operator UI: JWT error: {e}") + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on auth failure + return + + if not cred: + self._logger.warning("Operator UI: Auth server connection failed") + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on auth failure + return self + + # Since we control the server, we know these will be set + token = cred["token"] + operatorPage = cred["operatorPage"] + clientOptions = cred["clientOptions"] + willOptions = cred["willOptions"] + connectOptions = cred["connectOptions"] + self.publishOptions = cred["publishOptions"] + subscribeOptions = cred["subscribeOptions"] + + self.mqttClient = mqtt.Client( + callback_api_version=CallbackAPIVersion.VERSION2, **clientOptions + ) + + # This is not 100% reliable, hence the need to put the setup in the background + # See https://github.com/eclipse-paho/paho.mqtt.python/issues/890 + self.mqttClient.connect_timeout = 1.0 + + self.mqttClient.tls_set() + + self.mqttClient.will_set(**willOptions) + + self.mqttClient.username_pw_set("pythonClient", token) + + self.mqttClient.on_message = self._on_message + self.mqttClient.on_disconnect = self._on_disconnect + self.mqttClient.on_unsubscribe = self._on_unsubscribe + + try: + connect_error_code = self.mqttClient.connect(**connectOptions) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to connect with server (exception): {e}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on connection failure + return + + if connect_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to connect with server (error code): {connect_error_code}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on connection failure + return self + + try: + subscribe_error_code, messageId = self.mqttClient.subscribe( + **subscribeOptions + ) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (exception): {e}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on subscription failure + return + + if subscribe_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on subscription failure + return self + + self.mqttClient.loop_start() + + self.watcher = SimpleStationWatcher(self._send_update) + self.watcher.start() + + # Apply the patch to OpenHTF prompts to include the TofuPilot URL + patch_openhtf_prompts(operatorPage) + + # Show connection status message with URL + try: + # Use ANSI escape sequence for clickable link + green = "\033[0;32m" + reset = "\033[0m" + + # Create clickable URL + clickable_url = ( + f"\033]8;;{operatorPage}\033\\{operatorPage}\033]8;;\033\\" + ) + + # Print single line connection message with URL + print(f"\n{green}Connected to TofuPilot real-time server{reset}") + print(f"{green}Access Operator UI: {clickable_url}{reset}\n") + except: + # Fallback for terminals that don't support ANSI + self._logger.success(f"Connected to TofuPilot real-time server") + self._logger.success(f"Access Operator UI: {operatorPage}") + + except Exception as e: + self._logger.warning(f"Operator UI: Setup error - {e}") + print( + "To disable Operator UI streaming, use Test(..., stream=False) in your script" + ) + self.stream = False # Disable streaming on any setup error + + def _send_update(self, message): + # Skip publishing if streaming is disabled or client is None + if not self.stream or self.mqttClient is None: + return + + try: + self.mqttClient.publish( + payload=json.dumps( + {"action": "send", "source": "python", "message": message} + ), + **self.publishOptions, + ) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to publish to server (exception): {e}" + ) + self.stream = False # Disable streaming on publish failure + return + + def _handle_answer(self, plug_name, method_name, args): + _, test_state = _get_executing_test() + + if test_state is None: + self._logger.warning("Operator UI: No running test found") + return + + # Find the plug matching `plug_name`. + plug = test_state.plug_manager.get_plug_by_class_path(plug_name) + if plug is None: + self._logger.warning(f"Operator UI: Plug not found - {plug_name}") + return + + method = getattr(plug, method_name, None) + + if not ( + plug.enable_remote + and isinstance(method, types.MethodType) + and not method_name.startswith("_") + and method_name not in plug.disable_remote_attrs + ): + self._logger.warning( + f"Operator UI: Method not found - {plug_name}.{method_name}" + ) + return - # Stop the event loop - self.loop.stop() + try: + # side-effecting ! + method(*args) + except Exception as e: # pylint: disable=broad-except + self._logger.warning( + f"Operator UI: Method call failed - {method_name}({', '.join(args)}) - {e}" + ) + + def _final_update(self, testRecord: TestRecord): + """ + If the test is fast enough, the watcher never triggers, to avoid the UI being out of sync, + we force send at least once at the very end of the test + """ + + # Skip if streaming is disabled or MQTT client doesn't exist + if not self.stream or self.mqttClient is None: + return + + test_record_dict = testRecord.as_base_types() + + test_state_dict = { + "status": "COMPLETED", + "test_record": test_record_dict, + "plugs": {"plug_states": {}}, + "running_phase_state": {}, + } + + self._send_update(test_state_dict) + + # Operator UI-related callbacks + + def _on_message(self, client, userdata, message): + parsed = json.loads(message.payload) + + if parsed["source"] == "web": + self._handle_answer(**parsed["message"]) + + def _on_disconnect( + self, client, userdata, disconnect_flags, reason_code, properties + ): + if reason_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Unexpected disconnect (code {reason_code})" + ) + + def _on_unsubscribe(self, client, userdata, mid, reason_code_list, properties): + if any( + reason_code != mqtt.MQTT_ERR_SUCCESS for reason_code in reason_code_list + ): + self._logger.warning( + f"Operator UI: Partial disconnect (codes {reason_code_list})" + ) diff --git a/tofupilot/openhtf/upload.py b/tofupilot/openhtf/upload.py index 56737b6..c3e5f5b 100644 --- a/tofupilot/openhtf/upload.py +++ b/tofupilot/openhtf/upload.py @@ -15,6 +15,7 @@ from ..utils import ( notify_server, ) +from ..utils.logger import LoggerStateManager class upload: # pylint: disable=invalid-name @@ -63,111 +64,182 @@ def __init__( self._logger = self.client._logger self._url = self.client._url self._headers = self.client._headers - self._verify = verify + self._verify = verify # Kept for backward compatibility self._max_attachments = self.client._max_attachments self._max_file_size = self.client._max_file_size def __call__(self, test_record: TestRecord): - - # Extract relevant details from the test record - dut_id = test_record.dut_id - test_name = test_record.metadata.get("test_name") - - # Convert milliseconds to a datetime object - start_time = datetime.datetime.fromtimestamp( - test_record.start_time_millis / 1000.0 - ) - - # Format the timestamp as YYYY-MM-DD_HH_MM_SS_SSS - start_time_formatted = start_time.strftime("%Y-%m-%d_%H-%M-%S-%f")[:-3] - - temp_dir = tempfile.gettempdir() - - # Craft system-agnostic temporary filename - filename = os.path.join( - temp_dir, f"{dut_id}.{test_name}.{start_time_formatted}.json" - ) - - # Use the existing OutputToJSON callback to write to the custom file - output_callback = json_factory.OutputToJSON( - filename, - inline_attachments=False, # Exclude raw attachments - allow_nan=self.allow_nan, - indent=4, - ) - - # Open the custom file and write serialized test record to it - with open(filename, "w", encoding="utf-8") as file: - for json_line in output_callback.serialize_test_record(test_record): - file.write(json_line) + # Resume logger to ensure it's active during attachment processing + was_logger_resumed = False + if hasattr(self._logger, "resume"): + self._logger.resume() + was_logger_resumed = True try: - # Call create_run_from_report with the generated file path - run_id = self.client.upload_and_create_from_openhtf_report(filename) - finally: - # Ensure the file is deleted after processing - if os.path.exists(filename): - os.remove(filename) - - if run_id: + # Extract relevant details from the test record + dut_id = test_record.dut_id + test_name = test_record.metadata.get("test_name") + + # Convert milliseconds to a datetime object + start_time = datetime.datetime.fromtimestamp( + test_record.start_time_millis / 1000.0 + ) + + # Format the timestamp as YYYY-MM-DD_HH_MM_SS_SSS + start_time_formatted = start_time.strftime("%Y-%m-%d_%H-%M-%S-%f")[:-3] + + temp_dir = tempfile.gettempdir() + + # Craft system-agnostic temporary filename + filename = os.path.join( + temp_dir, f"{dut_id}.{test_name}.{start_time_formatted}.json" + ) + + # Use the existing OutputToJSON callback to write to the custom file + output_callback = json_factory.OutputToJSON( + filename, + inline_attachments=False, # Exclude raw attachments + allow_nan=self.allow_nan, + indent=4, + ) + + # Open the custom file and write serialized test record to it + with open(filename, "w", encoding="utf-8") as file: + for json_line in output_callback.serialize_test_record(test_record): + file.write(json_line) + + try: + # Call create_run_from_report with the generated file path + result = self.client.upload_and_create_from_openhtf_report(filename) + + # Extract run_id from response - it could be a string (id) or a dict (result with id field) + run_id = None + + if isinstance(result, dict): + # It's a dictionary response + if not result.get("success", True): + self._logger.error("Run creation failed, skipping attachments") + return + + # Try to get the ID from the dictionary + run_id = result.get("id") + else: + # Direct ID string + run_id = result + + # Final validation of run_id + if not run_id or not isinstance(run_id, str): + self._logger.error( + f"Invalid run ID received: {run_id}, skipping attachments" + ) + return + except Exception as e: + self._logger.error(f"Error creating run: {str(e)}") + return + finally: + # Ensure the file is deleted after processing + if os.path.exists(filename): + os.remove(filename) + + # Process attachments number_of_attachments = 0 - for phase in test_record.phases: + for phase_idx, phase in enumerate(test_record.phases): + # Count attachments silently + attachment_count = len(phase.attachments) + # Keep only max number of attachments if number_of_attachments >= self._max_attachments: self._logger.warning( - "Too many attachments, trimming to %d attachments.", - self._max_attachments, + f"Attachment limit ({self._max_attachments}) reached" ) break + + # Process each attachment in the phase for attachment_name, attachment in phase.attachments.items(): # Remove attachments that exceed the max file size if attachment.size > self._max_file_size: - self._logger.warning( - "File size exceeds the maximum allowed size of %d bytes: %s", - self._max_file_size, - attachment.name, - ) + self._logger.warning(f"File too large: {attachment_name}") continue if number_of_attachments >= self._max_attachments: break number_of_attachments += 1 - self._logger.info("Uploading %s...", attachment_name) + # Use LoggerStateManager to temporarily activate the logger + with LoggerStateManager(self._logger): + self._logger.info(f"Uploading attachment...") # Upload initialization initialize_url = f"{self._url}/uploads/initialize" payload = {"name": attachment_name} - response = requests.post( - initialize_url, - data=json.dumps(payload), - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - response.raise_for_status() - response_json = response.json() - upload_url = response_json.get("uploadUrl") - upload_id = response_json.get("id") - - requests.put( - upload_url, - data=attachment.data, - headers={"Content-Type": attachment.mimetype}, - timeout=SECONDS_BEFORE_TIMEOUT, - ) + try: + response = requests.post( + initialize_url, + data=json.dumps(payload), + headers=self._headers, + verify=self._verify, + timeout=SECONDS_BEFORE_TIMEOUT, + ) - notify_server( - self._headers, - self._url, - upload_id, - run_id, - self._verify, - ) + response.raise_for_status() + response_json = response.json() + upload_url = response_json.get("uploadUrl") + upload_id = response_json.get("id") + + # Handle file attachments created with test.attach_from_file + try: + attachment_data = attachment.data + + # Some OpenHTF implementations have file path in the attachment object + if hasattr(attachment, "file_path") and getattr( + attachment, "file_path" + ): + try: + with open( + getattr(attachment, "file_path"), "rb" + ) as f: + attachment_data = f.read() + self._logger.info( + f"Read file data from {attachment.file_path}" + ) + except Exception as e: + self._logger.warning( + f"Could not read from file_path: {str(e)}" + ) + # Continue with attachment.data + + requests.put( + upload_url, + data=attachment_data, + headers={"Content-Type": attachment.mimetype}, + timeout=SECONDS_BEFORE_TIMEOUT, + ) + except Exception as e: + self._logger.error(f"Error uploading data: {str(e)}") + continue + + notify_server( + self._headers, + self._url, + upload_id, + run_id, + logger=self._logger, + ) - self._logger.success( - "Attachment %s successfully uploaded and linked to run.", - attachment_name, - ) + # Use LoggerStateManager to temporarily activate the logger + with LoggerStateManager(self._logger): + self._logger.success( + f"Uploaded attachment: {attachment_name}" + ) + except Exception as e: + # Use LoggerStateManager to temporarily activate the logger + with LoggerStateManager(self._logger): + self._logger.error( + f"Failed to process attachment: {str(e)}" + ) + continue + finally: + # For attachment logs to be visible, we intentionally don't pause the logger here + # Instead, we'll let the TofuPilot class's __exit__ method handle the logger state + pass diff --git a/tofupilot/utils/__init__.py b/tofupilot/utils/__init__.py index 7fecdd2..4038c17 100644 --- a/tofupilot/utils/__init__.py +++ b/tofupilot/utils/__init__.py @@ -1,10 +1,12 @@ -from .logger import setup_logger +from .logger import setup_logger, LoggerStateManager from .version_checker import check_latest_version from .files import ( validate_files, upload_file, notify_server, upload_attachments, + upload_attachment_data, + process_openhtf_attachments, log_and_raise, ) from .dates import ( @@ -17,15 +19,19 @@ handle_response, handle_http_error, handle_network_error, + api_request, ) __all__ = [ "setup_logger", + "LoggerStateManager", "check_latest_version", "validate_files", "upload_file", "notify_server", "upload_attachments", + "upload_attachment_data", + "process_openhtf_attachments", "parse_error_message", "timedelta_to_iso", "duration_to_iso", @@ -34,4 +40,5 @@ "handle_response", "handle_http_error", "handle_network_error", + "api_request", ] diff --git a/tofupilot/utils/files.py b/tofupilot/utils/files.py index 5dd3616..2f456fc 100644 --- a/tofupilot/utils/files.py +++ b/tofupilot/utils/files.py @@ -3,10 +3,11 @@ from logging import Logger import os import sys -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Union import requests from ..constants.requests import SECONDS_BEFORE_TIMEOUT +from .logger import LoggerStateManager def log_and_raise(logger: Logger, error_message: str): @@ -46,19 +47,8 @@ def upload_file( headers: dict, url: str, file_path: str, - verify: Optional[str] = None, -) -> bool: - """Initializes an upload and stores file in it - - Args: - headers (dict): Request headers including authorization - url (str): Base API URL - file_path (str): Path to the file to upload - verify (Optional[str]): Path to a CA bundle file to verify the server certificate - - Returns: - str: The ID of the created upload - """ +) -> str: + """Initializes an upload and stores file in it""" # Upload initialization initialize_url = f"{url}/uploads/initialize" file_name = os.path.basename(file_path) @@ -91,62 +81,329 @@ def upload_file( def notify_server( - headers: dict, - url: str, - upload_id: str, - run_id: str, - verify: Optional[str] = None, + headers: dict, url: str, upload_id: str, run_id: str, logger=None ) -> bool: - """Tells TP server to sync upload with newly created run - - Args: - headers (dict): Request headers including authorization - url (str): Base API URL - upload_id (str): ID of the upload to link - run_id (str): ID of the run to link to - verify (Optional[str]): Path to a CA bundle file to verify the server certificate - - Returns: - bool: True if successful - """ + """Tells TP server to sync upload with newly created run""" sync_url = f"{url}/uploads/sync" sync_payload = {"upload_id": upload_id, "run_id": run_id} - response = requests.post( - sync_url, - data=json.dumps(sync_payload), - verify=verify, - headers=headers, - timeout=SECONDS_BEFORE_TIMEOUT, - ) + try: + response = requests.post( + sync_url, + data=json.dumps(sync_payload), + headers=headers, + timeout=SECONDS_BEFORE_TIMEOUT, + ) + response.raise_for_status() + + return True + except Exception as e: + # If logger is available, log the error properly + if logger: + with LoggerStateManager(logger): + logger.error(f"Failed to sync attachment: {str(e)}") + return False - return response.status_code == 200 + +def upload_attachment_data( + logger: Logger, headers: dict, url: str, name: str, data, mimetype: str, run_id: str +) -> bool: + """ + Uploads binary data as an attachment and links it to a run + + Uses LoggerStateManager to ensure proper logging, similar to OpenHTF implementation. + """ + try: + initialize_url = f"{url}/uploads/initialize" + payload = {"name": name} + + response = requests.post( + initialize_url, + data=json.dumps(payload), + headers=headers, + timeout=SECONDS_BEFORE_TIMEOUT, + ) + response.raise_for_status() + + # Get upload details + response_json = response.json() + upload_url = response_json.get("uploadUrl") + upload_id = response_json.get("id") + + # Upload the actual data + content_type = mimetype or "application/octet-stream" + upload_response = requests.put( + upload_url, + data=data, + headers={"Content-Type": content_type}, + timeout=SECONDS_BEFORE_TIMEOUT, + ) + upload_response.raise_for_status() + + # Link attachment to run + notify_server(headers, url, upload_id, run_id, logger) + + # Log success with LoggerStateManager for visibility + with LoggerStateManager(logger): + logger.success(f"Uploaded attachment: {name}") + return True + except Exception as e: + # Log error with LoggerStateManager for visibility + with LoggerStateManager(logger): + logger.error(f"Upload failed: {name} - {str(e)}") + return False def upload_attachments( logger: Logger, headers: dict, url: str, - paths: List[Dict[str, Optional[str]]], + paths: List[str], run_id: str, verify: Optional[str] = None, ): - """Creates one upload per file and stores them into TofuPilot - - Args: - logger (Logger): Logger instance - headers (dict): Request headers including authorization - url (str): Base API URL - paths (List[Dict[str, Optional[str]]]): List of file paths to upload - run_id (str): ID of the run to link files to - verify (Optional[str]): Path to a CA bundle file to verify the server certificate """ + Creates one upload per file path and stores them into TofuPilot + + Uses LoggerStateManager to ensure logging is properly handled during the upload process, + similar to the OpenHTF implementation. + """ + # Print a visual separator before attachment uploads + print("") + for file_path in paths: - logger.info("Uploading %s...", file_path) + # Use LoggerStateManager to ensure logger is active for each file + with LoggerStateManager(logger): + logger.info(f"Uploading attachment: {file_path}") - upload_id = upload_file(headers, url, file_path, verify) - notify_server(headers, url, upload_id, run_id, verify) + try: + # Verify file exists + if not os.path.exists(file_path): + with LoggerStateManager(logger): + logger.error(f"File not found: {file_path}") + continue - logger.success( - f"Attachment {file_path} successfully uploaded and linked to run." - ) + # Open file and prepare for upload + with open(file_path, "rb") as file: + name = os.path.basename(file_path) + data = file.read() + mimetype, _ = ( + mimetypes.guess_type(file_path) or "application/octet-stream" + ) + + # Use shared upload function + upload_attachment_data( + logger, headers, url, name, data, mimetype, run_id + ) + except Exception as e: + # Use LoggerStateManager to ensure error is visible + with LoggerStateManager(logger): + logger.error(f"Upload failed: {file_path} - {str(e)}") + continue + + +def process_openhtf_attachments( + logger: Logger, + headers: dict, + url: str, + test_record: Union[Dict, object], + run_id: str, + max_attachments: int, + max_file_size: int, + needs_base64_decode: bool = True, +) -> None: + """ + Process attachments from an OpenHTF test record and upload them. + + This function centralizes the attachment processing logic used in both the + direct TofuPilotClient.create_run_from_openhtf_report and the OpenHTF output callback. + + Uses LoggerStateManager to ensure proper logging visibility throughout the process, + similar to the OpenHTF implementation. + + Args: + logger: Logger for output messages + headers: HTTP headers for API authentication + url: Base API URL + test_record: OpenHTF test record (either as dict or object) + run_id: ID of the run to attach files to + max_attachments: Maximum number of attachments to process + max_file_size: Maximum size per attachment + needs_base64_decode: Whether attachment data is base64 encoded (true for dict format) + """ + # Print a visual separator + print("") + + # Use LoggerStateManager instead of directly resuming/pausing + with LoggerStateManager(logger): + logger.info("Processing attachments from test record") + + try: + attachment_count = 0 + + # Extract phases from test record based on type + if isinstance(test_record, dict): + phases = test_record.get("phases", []) + with LoggerStateManager(logger): + logger.info(f"Found {len(phases)} phases in JSON test record") + else: + phases = getattr(test_record, "phases", []) + with LoggerStateManager(logger): + logger.info(f"Found {len(phases)} phases in object test record") + + # Iterate through phases and their attachments + for i, phase in enumerate(phases): + # Skip if we've reached attachment limit + if attachment_count >= max_attachments: + with LoggerStateManager(logger): + logger.warning(f"Attachment limit ({max_attachments}) reached") + break + + # Get attachments based on record type + if isinstance(test_record, dict): + phase_attachments = phase.get("attachments", {}) + phase_name = phase.get("name", f"Phase {i}") + else: + phase_attachments = getattr(phase, "attachments", {}) + phase_name = getattr(phase, "name", f"Phase {i}") + + # Skip if phase has no attachments + if not phase_attachments: + continue + + with LoggerStateManager(logger): + logger.info( + f"Processing {len(phase_attachments)} attachments in {phase_name}" + ) + + # Process each attachment in the phase + for name, attachment in phase_attachments.items(): + # Skip if we've reached attachment limit + if attachment_count >= max_attachments: + break + + # Debug attachment details (using debug level to avoid cluttering the console) + if isinstance(test_record, dict): + with LoggerStateManager(logger): + logger.debug(f"Attachment: {name}, Type: JSON format") + else: + attrs = [ + attr for attr in dir(attachment) if not attr.startswith("_") + ] + with LoggerStateManager(logger): + logger.debug( + f"Attachment: {name}, Type: Object, Attributes: {attrs}" + ) + + # Get attachment data and size based on record type + if isinstance(test_record, dict): + # Dict format (from JSON file) + attachment_data = attachment.get("data", "") + if not attachment_data: + with LoggerStateManager(logger): + logger.warning(f"No data in: {name}") + continue + + try: + if needs_base64_decode: + import base64 + + data = base64.b64decode(attachment_data) + else: + data = attachment_data + + attachment_size = len(data) + mimetype = attachment.get( + "mimetype", "application/octet-stream" + ) + except Exception as e: + with LoggerStateManager(logger): + logger.error( + f"Failed to process attachment data: {name} - {str(e)}" + ) + continue + else: + # Object format (from callback) + attachment_data = getattr(attachment, "data", None) + + # Handle different attachment types in OpenHTF + if attachment_data is None: + with LoggerStateManager(logger): + logger.warning(f"No data in: {name}") + continue + + # Handle file-based attachments in different formats + data = None + + # Option 1: Check for direct file_path attribute + if hasattr(attachment, "file_path") and getattr( + attachment, "file_path" + ): + try: + file_path = getattr(attachment, "file_path") + with LoggerStateManager(logger): + logger.info(f"Found file_path attribute: {file_path}") + with open(file_path, "rb") as f: + data = f.read() + except Exception as e: + with LoggerStateManager(logger): + logger.error(f"Failed to read from file_path: {str(e)}") + + # Option 2: Check for filename attribute (used in some OpenHTF versions) + elif hasattr(attachment, "filename") and getattr( + attachment, "filename" + ): + try: + file_path = getattr(attachment, "filename") + with LoggerStateManager(logger): + logger.info(f"Found filename attribute: {file_path}") + with open(file_path, "rb") as f: + data = f.read() + except Exception as e: + with LoggerStateManager(logger): + logger.error(f"Failed to read from filename: {str(e)}") + + # Option 3: Use the data attribute directly + else: + with LoggerStateManager(logger): + logger.info("Using data attribute directly") + data = attachment_data + + # Verify we have valid data + if data is None: + with LoggerStateManager(logger): + logger.error(f"No valid data found for attachment: {name}") + continue + + # Get size from attribute or calculate it + attachment_size = getattr(attachment, "size", len(data)) + mimetype = getattr( + attachment, "mimetype", "application/octet-stream" + ) + + # Skip oversized attachments + if attachment_size > max_file_size: + with LoggerStateManager(logger): + logger.warning(f"File too large: {name}") + continue + + # Increment counter and process the attachment + attachment_count += 1 + + # Use unified attachment upload function - logging is handled inside this function + try: + success = upload_attachment_data( + logger, headers, url, name, data, mimetype, run_id + ) + + # Don't log success/failure here as it's already logged in upload_attachment_data + except Exception as e: + with LoggerStateManager(logger): + logger.error( + f"Exception during attachment upload: {name} - {str(e)}" + ) + # Continue with other attachments regardless of success/failure + finally: + # We intentionally don't pause the logger here, as in the OpenHTF implementation + # This allows any final log messages to be visible + pass diff --git a/tofupilot/utils/logger.py b/tofupilot/utils/logger.py index 4cc2efd..2cc5294 100644 --- a/tofupilot/utils/logger.py +++ b/tofupilot/utils/logger.py @@ -1,5 +1,10 @@ import logging import sys +import os +import time +import threading +import queue +from datetime import datetime # Define a custom log level for success messages SUCCESS_LEVEL_NUM = 25 @@ -7,7 +12,7 @@ def success(self, message, *args, **kws): - """Log a success message.""" + """Log success at custom level.""" if self.isEnabledFor(SUCCESS_LEVEL_NUM): self._log(SUCCESS_LEVEL_NUM, message, args, **kws) @@ -15,41 +20,197 @@ def success(self, message, *args, **kws): logging.Logger.success = success +def add_pause_methods_to_logger(logger): + """Add pause/resume functionality to a logger""" + original_handlers = list(logger.handlers) + + def pause(): + """Temporarily disable all handlers""" + for handler in logger.handlers: + logger.removeHandler(handler) + + def resume(): + """Re-enable handlers""" + for handler in original_handlers: + if handler not in logger.handlers: + logger.addHandler(handler) + + # Add methods to logger + logger.pause = pause + logger.resume = resume + + return logger + + +class LoggerStateManager: + """Context manager for temporarily ensuring logger is active""" + + def __init__(self, logger): + self.logger = logger + self.was_resumed = False + + def __enter__(self): + # Ensure logger is active for this block + if hasattr(self.logger, 'resume'): + self.logger.resume() + self.was_resumed = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # If we resumed the logger, restore to paused state + if self.was_resumed and hasattr(self.logger, 'pause'): + self.logger.pause() + + +class TofupilotFormatter(logging.Formatter): + """Minimal formatter with colors and no timestamp.""" + + # ANSI color codes + RESET = "\033[0m" + BLUE = "\033[0;34m" + GREEN = "\033[0;32m" + YELLOW = "\033[0;33m" + RED = "\033[0;31m" + RED_BG = "\033[1;41m" + GRAY = "\033[0;37m" + BOLD = "\033[1m" + + # Log level name mapping + LEVEL_NAMES = { + logging.DEBUG: "DBG", + logging.INFO: "INF", + logging.WARNING: "WRN", + logging.ERROR: "ERR", + logging.CRITICAL: "CRT", + SUCCESS_LEVEL_NUM: "OK!", + } + + # Color mapping for levels + LEVEL_COLORS = { + logging.DEBUG: GRAY, + logging.INFO: BLUE, + logging.WARNING: YELLOW, + logging.ERROR: RED, + logging.CRITICAL: RED_BG, + SUCCESS_LEVEL_NUM: GREEN, + } + + def __init__(self): + """Initialize formatter.""" + super().__init__() + + def format(self, record): + """Format log with minimal prefix and colors.""" + # Get log level color and short name + level_color = self.LEVEL_COLORS.get(record.levelno, self.RESET) + level_name = self.LEVEL_NAMES.get(record.levelno, "???") + + # Create minimal prefix with no timestamp + prefix = f"{level_color}{self.BOLD}TP{self.RESET}{level_color}:{level_name} " + + # Add log message with color + message = record.getMessage() + formatted_message = f"{prefix}{message}{self.RESET}" + + # Add exception info if present + if record.exc_info: + exc_text = self.formatException(record.exc_info) + formatted_message += f"\n{exc_text}" + + return formatted_message + + +# Simple formatter for backward compatibility class CustomFormatter(logging.Formatter): - """Custom formatter to add symbols and colors to the log messages.""" + """Simple formatter with no timestamp.""" reset_code = "\033[0m" format_dict = { - logging.DEBUG: "\033[0;37m%(asctime)s - DEBUG: %(message)s" - + reset_code, # White - logging.INFO: "\033[0;34m%(asctime)s - ℹ️ %(message)s" - + reset_code, # Blue with info symbol - logging.WARNING: "\033[0;33m%(asctime)s - ⚠️ %(message)s" - + reset_code, # Yellow with warning symbol - logging.ERROR: "\033[0;31m%(asctime)s - ❌ %(message)s" - + reset_code, # Red with cross mark - logging.CRITICAL: "\033[1;41m%(asctime)s - 🚨 %(message)s" - + reset_code, # White on red background with alarm symbol - SUCCESS_LEVEL_NUM: "\033[0;32m%(asctime)s - ✅ %(message)s" - + reset_code, # Green with checkmark + logging.DEBUG: "\033[0;37mDEBUG: %(message)s" + reset_code, + logging.INFO: "\033[0;34mINFO: %(message)s" + reset_code, + logging.WARNING: "\033[0;33mWARN: %(message)s" + reset_code, + logging.ERROR: "\033[0;31mERROR: %(message)s" + reset_code, + logging.CRITICAL: "\033[1;41mCRIT: %(message)s" + reset_code, + SUCCESS_LEVEL_NUM: "\033[0;32mSUCCESS: %(message)s" + reset_code, } def format(self, record): - """Format the specified record as text.""" + """Format record with minimal prefix.""" log_fmt = self.format_dict.get(record.levelno, self._fmt) - formatter = logging.Formatter(log_fmt, datefmt="%Y-%m-%d %H:%M:%S") + formatter = logging.Formatter(log_fmt) return formatter.format(record) -def setup_logger(log_level: int): - """Set up the logger with a custom formatter and stream handler.""" - logger = logging.getLogger(__name__) +class LogLevelFilter(logging.Filter): + """Sets log level from environment variable.""" + + def __init__(self, env_var='TOFUPILOT_LOG_LEVEL'): + """Init with env var name.""" + super().__init__() + self.env_var = env_var + self.level = self._get_level_from_env() + + def _get_level_from_env(self): + """Get level from env var.""" + level_str = os.environ.get(self.env_var, 'INFO').upper() + level_map = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'SUCCESS': SUCCESS_LEVEL_NUM, + 'WARNING': logging.WARNING, + 'WARN': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL + } + return level_map.get(level_str, logging.INFO) + + def filter(self, record): + """Apply dynamic level filtering.""" + # Reload level from env for each record to allow runtime changes + self.level = self._get_level_from_env() + return record.levelno >= self.level + + +def setup_logger(log_level=None, advanced_format=True): + """Configure logger with minimal formatting and color support. + + Args: + log_level: Override env var TOFUPILOT_LOG_LEVEL + advanced_format: Use TofupilotFormatter (default) or CustomFormatter + + Returns: + Configured logger instance + """ + # Maintain backward compatibility with __name__ + logger_name = "tofupilot" + logger = logging.getLogger(logger_name) + + # Clear existing handlers + if logger.handlers: + logger.handlers.clear() + + # Set level from arg or environment + level_filter = LogLevelFilter() + log_level = log_level or level_filter.level logger.setLevel(log_level) + + # Create handler handler = logging.StreamHandler(sys.stdout) handler.setLevel(log_level) - handler.setFormatter(CustomFormatter()) - if not logger.handlers: - logger.addHandler(handler) - - return logger + + # Choose formatter based on preference + if advanced_format: + handler.setFormatter(TofupilotFormatter()) + else: + handler.setFormatter(CustomFormatter()) + + handler.addFilter(level_filter) + + # Add handler to logger + logger.addHandler(handler) + + # Add pause/resume functionality + logger = add_pause_methods_to_logger(logger) + + return logger \ No newline at end of file diff --git a/tofupilot/utils/network.py b/tofupilot/utils/network.py index 5adae43..0c3059b 100644 --- a/tofupilot/utils/network.py +++ b/tofupilot/utils/network.py @@ -1,9 +1,11 @@ -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Union import requests +from ..constants.requests import SECONDS_BEFORE_TIMEOUT def parse_error_message(response: requests.Response) -> str: + """Extract error message from response""" try: error_data = response.json() return error_data.get("error", {}).get( @@ -13,6 +15,29 @@ def parse_error_message(response: requests.Response) -> str: return f"HTTP error occurred: {response.text}" +def api_request( + logger, method: str, url: str, headers: Dict, + data: Optional[Dict] = None, + params: Optional[Dict] = None, + timeout: int = SECONDS_BEFORE_TIMEOUT +) -> Dict: + """Unified API request handler with consistent error handling""" + try: + response = requests.request( + method, url, + json=data, + headers=headers, + params=params, + timeout=timeout + ) + response.raise_for_status() + return handle_response(logger, response) + except requests.exceptions.HTTPError as http_err: + return handle_http_error(logger, http_err) + except requests.RequestException as e: + return handle_network_error(logger, e) + + def handle_response( logger, response: requests.Response, @@ -23,16 +48,42 @@ def handle_response( """ data = response.json() - # Logging warnings if present - warnings: Optional[List[str]] = data.get("warnings") - if warnings is not None: - for warning in warnings: - logger.warning(warning) - - # Logging success message if the JSON has one - message = data.get("message") - if message is not None: - logger.success(message) + # Ensure logger is active to process messages + was_resumed = False + if hasattr(logger, 'resume'): + logger.resume() + was_resumed = True + + try: + # Process warnings + warnings: Optional[List[str]] = data.get("warnings") + if warnings is not None: + for warning in warnings: + logger.warning(warning) + + # Process errors + errors = data.get("errors") or data.get("error") + if errors: + # Handle both array and single object formats + if isinstance(errors, list): + for error in errors: + if isinstance(error, dict) and "message" in error: + logger.error(error["message"]) + else: + logger.error(str(error)) + elif isinstance(errors, dict) and "message" in errors: + logger.error(errors["message"]) + elif isinstance(errors, str): + logger.error(errors) + + # Process success message + message = data.get("message") + if message is not None: + logger.success(message) + finally: + # Restore logger state if needed + if was_resumed and hasattr(logger, 'pause'): + logger.pause() # Returning the parsed JSON to the caller return data @@ -42,27 +93,42 @@ def handle_http_error( logger, http_err: requests.exceptions.HTTPError ) -> Dict[str, Any]: """Handles HTTP errors and logs them.""" - - warnings = None # Initialize warnings to None - - # Check if the response body is not empty and Content-Type is application/json - if ( - http_err.response.text.strip() - and http_err.response.headers.get("Content-Type") == "application/json" - ): - # Parse JSON safely - response_json = http_err.response.json() - warnings = response_json.get("warnings") - if warnings is not None: - for warning in warnings: - logger.warning(warning) - error_message = parse_error_message(http_err.response) - else: - # Handle cases where response is empty or non-JSON - error_message = http_err - - logger.error(error_message) - + warnings = None + + # Ensure logger is active to process messages + was_resumed = False + if hasattr(logger, 'resume'): + logger.resume() + was_resumed = True + + try: + # Extract error details from JSON response when available + if (http_err.response.text.strip() and + http_err.response.headers.get("Content-Type") == "application/json"): + try: + response_json = http_err.response.json() + + # Process warnings if present + warnings = response_json.get("warnings") + if warnings: + for warning in warnings: + logger.warning(warning) + + # Get error message + error_message = parse_error_message(http_err.response) + except ValueError: + error_message = str(http_err) + else: + error_message = str(http_err) + + # Log the error through the logger for proper formatting + logger.error(error_message) + finally: + # Restore logger state if needed + if was_resumed and hasattr(logger, 'pause'): + logger.pause() + + # Return structured error info return { "success": False, "message": None, @@ -74,7 +140,27 @@ def handle_http_error( def handle_network_error(logger, e: requests.RequestException) -> Dict[str, Any]: """Handles network errors and logs them.""" - logger.error(f"Network error: {e}") + # Ensure logger is active to process messages + was_resumed = False + if hasattr(logger, 'resume'): + logger.resume() + was_resumed = True + + try: + error_message = f"Network error: {str(e)}" + logger.error(error_message) + + # Provide SSL-specific guidance + if isinstance(e, requests.exceptions.SSLError) or "SSL" in str(e) or "certificate verify failed" in str(e): + logger.warning("SSL certificate verification error detected") + logger.warning("This is typically caused by missing or invalid SSL certificates") + logger.warning("Try: 1) pip install certifi 2) /Applications/Python*/Install Certificates.command") + finally: + # Restore logger state if needed + if was_resumed and hasattr(logger, 'pause'): + logger.pause() + + # Return structured error info return { "success": False, "message": None, diff --git a/tofupilot/utils/version_checker.py b/tofupilot/utils/version_checker.py index b017acc..b823519 100644 --- a/tofupilot/utils/version_checker.py +++ b/tofupilot/utils/version_checker.py @@ -22,7 +22,7 @@ def check_latest_version(logger, current_version, package_name: str): ) logger.warning(warning_message) except PackageNotFoundError: - logger.info(f"Package {package_name} is not installed.") + logger.info(f"Package not installed: {package_name}") except requests.RequestException as e: - logger.warning(f"Error checking the latest version: {e}") + logger.warning(f"Version check failed: {e}")