diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1227cad..5bd9002 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -92,21 +92,22 @@ When the release PR is merged, and a new release is detected by the “Python pu If you need to test a new version of the Python client before making an official release, you can publish it to TestPyPI, a sandbox version of PyPI used for testing package distributions. -1. Build the package locally using: +1. If a previous test package with the exact same version was released, update the version in `setup.py`. For instance, change version="X.Y.Z.dev0" to version="X.Y.Z.dev1". +2. Build the package locally using: ```sh rm -rf dist/* python -m build ``` This will generate distribution files in the dist/ directory. -2. If a previous test package with the exact same version was released, update the version in `setup.py`. For instance, change version="X.Y.Z.dev0" to version="X.Y.Z.dev1". +3. Get a testpypi API key, for example ask your managment if they have one. Then run: ```sh - twine upload --repository testpypi dist/ + twine upload --repository testpypi dist/* ``` -3. To install the new test package, run: +4. To install the new test package, run: ```sh pip install -i https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ tofupilot== ``` diff --git a/data/performance-report.pdf b/data/performance-report.pdf deleted file mode 100644 index ce6914b..0000000 Binary files a/data/performance-report.pdf and /dev/null differ diff --git a/data/temperature-map.png b/data/temperature-map.png deleted file mode 100644 index c7eb24b..0000000 Binary files a/data/temperature-map.png and /dev/null differ diff --git a/requirements.txt b/requirements.txt index 30ab3a8..f1a1740 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,7 @@ packaging pytest websockets pytest +paho-mqtt +build +twine +certifi>=2023.7.22 diff --git a/setup.py b/setup.py index 22735a1..e7a242b 100644 --- a/setup.py +++ b/setup.py @@ -5,25 +5,39 @@ setup( name="tofupilot", - version="1.10.0", + version="1.11.1", packages=find_packages(), - install_requires=["requests", "setuptools", "packaging", "pytest", "websockets"], + install_requires=[ + "requests>=2.25.0", + "setuptools>=50.0.0", + "packaging>=20.0", + "pytest", + "paho-mqtt>=2.0.0", + "sentry-sdk>=1.0.0", + "certifi>=2020.12.5", + ], entry_points={ "pytest11": [ "tofupilot = tofupilot.plugin", # Registering the pytest plugin ], }, - author="TofuPilot", + author="TofuPilot Team", author_email="hello@tofupilot.com", - description="The official Python client for the TofuPilot API", + description="Official Python client for TofuPilot with OpenHTF integration, real-time streaming and file attachment support", license="MIT", - keywords="automatic hardware testing tofupilot", + keywords="automatic hardware testing tofupilot openhtf", long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/tofupilot/python-client", classifiers=[ "Programming Language :: Python :: 3", "Operating System :: OS Independent", + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Intended Audience :: Manufacturing", + "Topic :: Scientific/Engineering", + "Topic :: Software Development :: Testing", + "Topic :: Software Development :: Libraries :: Python Modules", ], python_requires=">=3.9", ) diff --git a/tofupilot/client.py b/tofupilot/client.py index 7aee334..d226f82 100644 --- a/tofupilot/client.py +++ b/tofupilot/client.py @@ -8,8 +8,8 @@ from importlib.metadata import version import json -import base64 import requests +import certifi from .constants import ( ENDPOINT, @@ -29,7 +29,8 @@ handle_response, handle_http_error, handle_network_error, - notify_server, + api_request, + process_openhtf_attachments, ) @@ -55,9 +56,12 @@ def __init__( print_version_banner(self._current_version) self._logger = setup_logger(logging.INFO) + # Configure SSL certificate validation + self._setup_ssl_certificates() + self._api_key = api_key or os.environ.get("TOFUPILOT_API_KEY") if self._api_key is None: - error = "Please set TOFUPILOT_API_KEY environment variable. For more information on how to find or generate a valid API key, visit https://tofupilot.com/docs/user-management#api-key." # pylint: disable=line-too-long + error = "Please set TOFUPILOT_API_KEY environment variable. For more information on how to find or generate a valid API key, visit https://tofupilot.com/docs/user-management#api-key." self._logger.error(error) sys.exit(1) @@ -71,10 +75,21 @@ def __init__( self._max_file_size = FILE_MAX_SIZE check_latest_version(self._logger, self._current_version, "tofupilot") + def _setup_ssl_certificates(self): + """Configure SSL certificate validation using certifi if needed.""" + # Check if SSL_CERT_FILE is already set to a valid path + cert_file = os.environ.get("SSL_CERT_FILE") + if not cert_file or not os.path.isfile(cert_file): + # Use certifi's certificate bundle + certifi_path = certifi.where() + if os.path.isfile(certifi_path): + os.environ["SSL_CERT_FILE"] = certifi_path + self._logger.debug(f"SSL: Using certifi path {certifi_path}") + def _log_request(self, method: str, endpoint: str, payload: Optional[dict] = None): """Logs the details of the HTTP request.""" self._logger.debug( - "%s %s%s with payload: %s", method, self._url, endpoint, payload + "Request: %s %s%s payload=%s", method, self._url, endpoint, payload ) def create_run( # pylint: disable=too-many-arguments,too-many-locals @@ -127,7 +142,8 @@ def create_run( # pylint: disable=too-many-arguments,too-many-locals References: https://www.tofupilot.com/docs/api#create-a-run """ - self._logger.info("Starting run creation...") + print("") + self._logger.info("Creating run...") if attachments is not None: validate_files( @@ -171,35 +187,26 @@ def create_run( # pylint: disable=too-many-arguments,too-many-locals payload["report_variables"] = report_variables self._log_request("POST", "/runs", payload) + result = api_request( + self._logger, + "POST", + f"{self._url}/runs", + self._headers, + data=payload, + verify=self._verify, + ) - try: - response = requests.post( - f"{self._url}/runs", - json=payload, - headers=self._headers, - timeout=SECONDS_BEFORE_TIMEOUT, - verify=self._verify, + # Upload attachments if run was created successfully + run_id = result.get("id") + if run_id and attachments: + # Ensure logger is active for attachment uploads + if hasattr(self._logger, 'resume'): + self._logger.resume() + + upload_attachments( + self._logger, self._headers, self._url, attachments, run_id, self._verify, ) - response.raise_for_status() - result = handle_response(self._logger, response) - - run_id = result.get("id") - if run_id and attachments: - upload_attachments( - self._logger, - self._headers, - self._url, - attachments, - run_id, - self._verify, - ) - - return result - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return result def create_run_from_openhtf_report(self, file_path: str): """ @@ -219,74 +226,54 @@ def create_run_from_openhtf_report(self, file_path: str): # Upload report and create run from file_path run_id = self.upload_and_create_from_openhtf_report(file_path) + # If run_id is not a string, it's an error response dictionary + if not isinstance(run_id, str): + self._logger.error("OpenHTF import failed") + return run_id + + # Only continue with attachment upload if run_id is valid + test_record = None try: with open(file_path, "r", encoding="utf-8") as file: test_record = json.load(file) except FileNotFoundError: - print(f"Error: The file '{file_path}' was not found.") + self._logger.error(f"File not found: {file_path}") + return run_id except json.JSONDecodeError: - print(f"Error: The file '{file_path}' contains invalid JSON.") + self._logger.error(f"Invalid JSON: {file_path}") + return run_id except PermissionError: - print(f"Error: Insufficient permissions to read '{file_path}'.") + self._logger.error(f"Permission denied: {file_path}") + return run_id except Exception as e: - print(f"Unexpected error: {e}") - - if run_id and test_record: - number_of_attachments = 0 - for phase in test_record.get("phases"): - # Keep only max number of attachments - if number_of_attachments >= self._max_attachments: - self._logger.warning( - "Too many attachments, trimming to %d attachments.", - self._max_attachments, - ) - break - for attachment_name, attachment in phase.get("attachments").items(): - number_of_attachments += 1 - - self._logger.info("Uploading %s...", attachment_name) - - # Upload initialization - initialize_url = f"{self._url}/uploads/initialize" - payload = {"name": attachment_name} - - response = requests.post( - initialize_url, - data=json.dumps(payload), - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - response.raise_for_status() - response_json = response.json() - upload_url = response_json.get("uploadUrl") - upload_id = response_json.get("id") - - data = base64.b64decode(attachment["data"]) - - requests.put( - upload_url, - data=data, - headers={ - "Content-Type": attachment["mimetype"] - or "application/octet-stream", # Default to binary if mimetype is missing - }, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - notify_server( - self._headers, - self._url, - upload_id, - run_id, - self._verify, - ) - - self._logger.success( - "Attachment %s successfully uploaded and linked to run.", - attachment_name, - ) + self._logger.error(f"Error: {e}") + return run_id + + # Now safely proceed with attachment upload + if run_id and test_record and "phases" in test_record: + # Add a visual separator after the run success message + print("") + self._logger.info("Processing attachments from OpenHTF test record") + + # Use the centralized function to process all attachments + process_openhtf_attachments( + self._logger, + self._headers, + self._url, + test_record, + run_id, + self._max_attachments, + self._max_file_size, + needs_base64_decode=True, # JSON attachments need base64 decoding + verify=self._verify, + ) + else: + if not test_record: + self._logger.error("Test record load failed") + elif "phases" not in test_record: + self._logger.error("No phases in test record") + + return run_id def get_runs(self, serial_number: str) -> dict: """ @@ -314,28 +301,17 @@ def get_runs(self, serial_number: str) -> dict: "error": {"message": error_message}, } - self._logger.info( - "Fetching runs for unit with serial number %s...", serial_number - ) + self._logger.info("Fetching runs for: %s", serial_number) params = {"serial_number": serial_number} - self._log_request("GET", "/runs", params) - - try: - response = requests.get( - f"{self._url}/runs", - headers=self._headers, - verify=self._verify, - params=params, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, + "GET", + f"{self._url}/runs", + self._headers, + params=params, + verify=self._verify, + ) def delete_run(self, run_id: str) -> dict: """ @@ -351,24 +327,15 @@ def delete_run(self, run_id: str) -> dict: References: https://www.tofupilot.com/docs/api#delete-a-run """ - self._logger.info('Starting deletion of run "%s"...', run_id) - + self._logger.info("Deleting run: %s", run_id) self._log_request("DELETE", f"/runs/{run_id}") - - try: - response = requests.delete( - f"{self._url}/runs/{run_id}", - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, + "DELETE", + f"{self._url}/runs/{run_id}", + self._headers, + verify=self._verify, + ) def update_unit( self, serial_number: str, sub_units: Optional[List[SubUnit]] = None @@ -389,27 +356,17 @@ def update_unit( References: https://www.tofupilot.com/docs/api#update-a-unit """ - self._logger.info('Starting update of unit "%s"...', serial_number) - + self._logger.info("Updating unit: %s", serial_number) payload = {"sub_units": sub_units} - self._log_request("PATCH", f"/units/{serial_number}", payload) - - try: - response = requests.patch( - f"{self._url}/units/{serial_number}", - json=payload, - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, + "PATCH", + f"{self._url}/units/{serial_number}", + self._headers, + data=payload, + verify=self._verify, + ) def delete_unit(self, serial_number: str) -> dict: """ @@ -426,24 +383,15 @@ def delete_unit(self, serial_number: str) -> dict: References: https://www.tofupilot.com/docs/api#delete-a-unit """ - self._logger.info('Starting deletion of unit "%s"...', serial_number) - + self._logger.info("Deleting unit: %s", serial_number) self._log_request("DELETE", f"/units/{serial_number}") - - try: - response = requests.delete( - f"{self._url}/units/{serial_number}", - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - return handle_response(self._logger, response) - - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return api_request( + self._logger, + "DELETE", + f"{self._url}/units/{serial_number}", + self._headers, + verify=self._verify, + ) def upload_and_create_from_openhtf_report( self, @@ -457,7 +405,8 @@ def upload_and_create_from_openhtf_report( Id of the newly created run """ - self._logger.info("Starting run creation...") + print("") + self._logger.info("Importing run...") # Validate report validate_files( @@ -466,13 +415,20 @@ def upload_and_create_from_openhtf_report( # Upload report try: - upload_id = upload_file( - self._headers, self._url, file_path, self._verify - ) + # First, check if we have a valid API key directly (avoids cryptic errors) + if not self._api_key or len(self._api_key) < 10: + self._logger.error("API key error: Invalid API key format.") + return {"success": False, "error": {"message": "Invalid API key format."}} + + upload_id = upload_file(self._headers, self._url, file_path, self._verify) except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) + error_info = handle_http_error(self._logger, http_err) + # Error already logged by handle_http_error + return error_info except requests.RequestException as e: - return handle_network_error(self._logger, e) + error_info = handle_network_error(self._logger, e) + # Error already logged by handle_network_error + return error_info payload = { "upload_id": upload_id, @@ -483,57 +439,68 @@ def upload_and_create_from_openhtf_report( self._log_request("POST", "/import", payload) - # Create run from file - try: - response = requests.post( - f"{self._url}/import", - json=payload, - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - response.raise_for_status() - result = handle_response(self._logger, response) + # Create run from file using unified API request handler + result = api_request( + self._logger, + "POST", + f"{self._url}/import", + self._headers, + data=payload, + verify=self._verify, + ) + # Return only the ID if successful, otherwise return the full result + if result.get("success", False) is not False: run_id = result.get("id") + run_url = result.get("url") - return run_id + # Explicitly log success with URL if available + if run_url: + self._logger.success(f"Run imported successfully: {run_url}") + elif run_id: + self._logger.success(f"Run imported successfully with ID: {run_id}") - except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) - except requests.RequestException as e: - return handle_network_error(self._logger, e) + return run_id + else: + return result - def get_websocket_url(self) -> dict: + def get_connection_credentials(self) -> dict: """ - Fetches websocket connection url associated with API Key. + Fetches credentials required to livestream test results. Returns: - str: - Websocket connection URL. + values: + a dict containing the emqx server url, the topic to connect to, and the JWT token required to connect """ - try: response = requests.get( - f"{self._url}/rooms", + f"{self._url}/streaming", headers=self._headers, verify=self._verify, timeout=SECONDS_BEFORE_TIMEOUT, ) response.raise_for_status() values = handle_response(self._logger, response) - url = values.get("url") - return url - + return values except requests.exceptions.HTTPError as http_err: - return handle_http_error(self._logger, http_err) + handle_http_error(self._logger, http_err) + return None except requests.RequestException as e: - return handle_network_error(self._logger, e) + handle_network_error(self._logger, e) + return None def print_version_banner(current_version: str): - """Prints current version of client""" - banner = f""" - TofuPilot Python Client {current_version} - """ - print(banner.strip()) + """Prints current version of client with tofu art""" + # Colors for the tofu art + yellow = "\033[33m" # Yellow for the plane + blue = "\033[34m" # Blue for the cap border + reset = "\033[0m" # Reset color + + banner = ( + f"{blue}╭{reset} {yellow}✈{reset} {blue}╮{reset}\n" + f"[•ᴗ•] TofuPilot Python Client {current_version}\n" + "\n" + ) + + print(banner, end="") diff --git a/tofupilot/openhtf/__init__.py b/tofupilot/openhtf/__init__.py index 8f9ff42..6611486 100644 --- a/tofupilot/openhtf/__init__.py +++ b/tofupilot/openhtf/__init__.py @@ -1,9 +1,8 @@ -""" -This module handles all TofuPilot methods related to integration with OpenHTF. +"""TofuPilot integration with OpenHTF. It provides two main classes: -1. tofupilot.upload(): A way to interface with OpenHTF test scripts to automatically upload test results to the TofuPilot server. -2. tofupilot.TofuPilot(): A way to stream real-time execution data of OpenHTF tests to TofuPilot for live monitoring. +1. upload(): Upload OpenHTF test results to TofuPilot +2. TofuPilot(): Stream real-time test execution data for monitoring """ from .upload import upload diff --git a/tofupilot/openhtf/tofupilot.py b/tofupilot/openhtf/tofupilot.py index 649bf06..3f6a051 100644 --- a/tofupilot/openhtf/tofupilot.py +++ b/tofupilot/openhtf/tofupilot.py @@ -1,13 +1,16 @@ +import types from typing import Optional from time import sleep import threading -import asyncio import json from openhtf import Test from openhtf.util import data -from websockets import connect, ConnectionClosedError, InvalidURI +import paho.mqtt.client as mqtt +from paho.mqtt.enums import CallbackAPIVersion +from openhtf.core.test_record import TestRecord +from openhtf.core.test_state import TestState from .upload import upload from ..client import TofuPilotClient @@ -32,7 +35,7 @@ def _get_executing_test(): return test, test_state -def _to_dict_with_event(test_state): +def _to_dict_with_event(test_state: TestState): """Process a test state into the format we want to send to the frontend.""" original_dict, event = test_state.asdict_with_event() @@ -76,7 +79,7 @@ def stop(self): class TofuPilot: """ Context manager to automatically add an output callback to the running OpenHTF test - and live stream it's execution. + and live stream it's execution to the Operator UI. ### Usage Example: @@ -90,16 +93,20 @@ class TofuPilot: def main(): test = Test(*your_phases, procedure_id="FVT1") - # Stream real-time test execution data to TofuPilot + # Stream real-time test execution data to TofuPilot Operator UI with TofuPilot(test): - test.execute(lambda: "SN15") + # For more reliable Ctrl+C handling, use the helper function: + execute_with_graceful_exit(test, test_start=lambda: "SN15") + + # Or use the standard method (may show errors on Ctrl+C): + # test.execute(lambda: "SN15") ``` """ def __init__( self, test: Test, - stream: Optional[bool] = True, + stream: Optional[bool] = True, # Controls connection to Operator UI api_key: Optional[str] = None, url: Optional[str] = None, ): @@ -108,125 +115,321 @@ def __init__( self.client = TofuPilotClient(api_key=api_key, url=url) self.api_key = api_key self.url = url - self.loop = None - self.update_queue = None - self.event_loop_thread = None self.watcher = None self.shutdown_event = threading.Event() self.update_task = None + self.mqttClient = None + self.publishOptions = None + self._logger = self.client._logger + self._streaming_setup_thread = None def __enter__(self): - # Initialize a thread-safe asyncio.Queue + # Add upload callback without pausing the logger yet self.test.add_output_callbacks( - upload(api_key=self.api_key, url=self.url, client=self.client) + upload(api_key=self.api_key, url=self.url, client=self.client), + self._final_update, ) + # Start streaming setup before pausing the logger if self.stream: - self.update_queue = asyncio.Queue() + self.connection_completed = False - # Start the event loop in a separate thread - self.event_loop_thread = threading.Thread( - target=self.run_event_loop, daemon=True + # Start connection in a separate thread with 1s timeout + self._streaming_setup_thread = threading.Thread( + target=self._setup_streaming_with_state, daemon=True ) - self.event_loop_thread.start() - - # Wait until the event loop is ready - while self.loop is None: - sleep(0.1) - - # Start the SimpleStationWatcher with a callback to send updates - self.watcher = SimpleStationWatcher(self.send_update) - self.watcher.start() + self._streaming_setup_thread.start() + self._streaming_setup_thread.join(1.0) + # Pause logger after connection attempt is either completed or timed out + self._logger.pause() return self def __exit__(self, exc_type, exc_value, traceback): + """Clean up resources when exiting the context manager. + + This method handles proper cleanup even in the case of KeyboardInterrupt + or other exceptions to ensure resources are released properly. + """ + self._logger.resume() + + # Handle ongoing connection attempt + if self._streaming_setup_thread and self._streaming_setup_thread.is_alive(): + if ( + not hasattr(self, "connection_completed") + or not self.connection_completed + ): + self._logger.warning(f"Operator UI: Connection still in progress") + self._streaming_setup_thread.join(timeout=3.0) + if self._streaming_setup_thread.is_alive(): + self._logger.warning(f"Operator UI: Connection timed out") + # Stop the StationWatcher if self.watcher: - self.watcher.stop() - self.watcher.join() - - # Schedule the shutdown coroutine - if self.loop and not self.loop.is_closed(): - asyncio.run_coroutine_threadsafe(self.shutdown(), self.loop) - - # Wait for the event loop thread to finish - if self.event_loop_thread: - self.event_loop_thread.join() - - def send_update(self, message): - """Thread-safe method to send a message to the event loop.""" - if self.loop and not self.loop.is_closed(): - asyncio.run_coroutine_threadsafe(self.update_queue.put(message), self.loop) - - def run_event_loop(self): - """Runs the asyncio event loop in a separate thread.""" - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - self.loop.run_until_complete(self.setup()) - self.loop.run_forever() - - async def setup(self): - """Starts the update processor.""" - # Start the coroutine that processes updates - self.update_task = asyncio.create_task(self.process_updates()) - - async def process_updates(self): - """ - Sends the current state of the test to the WebSocket server. - """ + try: + self.watcher.stop() + self.watcher.join(timeout=2.0) # Add timeout to prevent hanging + except Exception as e: + self._logger.warning(f"Error stopping watcher: {e}") + + # Clean up MQTT connection + if self.mqttClient: + try: + # Doesn't wait for publish operation to stop, this is fine since __exit__ is only called after the run was imported + self.mqttClient.loop_stop() + self.mqttClient.disconnect() + except Exception as e: + self._logger.warning(f"Error disconnecting MQTT client: {e}") + finally: + self.mqttClient = None + + # Return False to allow any exception to propagate, unless it's a KeyboardInterrupt + # In case of KeyboardInterrupt, return True to suppress the exception + return exc_type is KeyboardInterrupt + + # Operator UI-related methods + + def _setup_streaming_with_state(self): + """Run the streaming setup and track connection completion state.""" + self._setup_streaming() + self.connection_completed = True + + def _setup_streaming(self): try: - url = self.client.get_websocket_url() - - if not url: - return # Exit gracefully if no URL is provided - - retry_count = 0 - max_retries = 5 - backoff_factor = 2 # Exponential backoff base - - while retry_count < max_retries and not self.shutdown_event.is_set(): - try: - async with connect(url) as websocket: - while not self.shutdown_event.is_set(): - try: - # Fetch state update from the queue (with timeout to avoid blocking indefinitely) - state_update = await asyncio.wait_for( - self.update_queue.get(), timeout=1.0 - ) - # Send the state update to the WebSocket server - await websocket.send( - json.dumps( - {"action": "send", "message": state_update} - ) - ) - except asyncio.TimeoutError: - continue # Timeout waiting for an update; loop back - except asyncio.CancelledError: - return # Exit cleanly on task cancellation - except Exception: # pylint: disable=broad-exception-caught - break # Exit WebSocket loop on unexpected errors - except (ConnectionClosedError, OSError, InvalidURI): - retry_count += 1 - await asyncio.sleep( - backoff_factor**retry_count - ) # Exponential backoff - except asyncio.CancelledError: - return # Exit cleanly on task cancellation - except Exception: # pylint: disable=broad-exception-caught - break # Exit gracefully on unexpected errors - except Exception: # pylint: disable=broad-exception-caught - pass # Catch all remaining exceptions to ensure robustness - - async def shutdown(self): - """Cleans up resources and stops the event loop.""" - # Cancel the update task - if self.update_task is not None: - self.update_task.cancel() try: - await self.update_task - except asyncio.CancelledError: - pass + cred = self.client.get_connection_credentials() + except Exception as e: + self._logger.warning(f"Operator UI: JWT error: {e}") + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on auth failure + return + + if not cred: + self._logger.warning("Operator UI: Auth server connection failed") + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on auth failure + return self + + # Since we control the server, we know these will be set + token = cred["token"] + operatorPage = cred["operatorPage"] + clientOptions = cred["clientOptions"] + willOptions = cred["willOptions"] + connectOptions = cred["connectOptions"] + self.publishOptions = cred["publishOptions"] + subscribeOptions = cred["subscribeOptions"] + + self.mqttClient = mqtt.Client( + callback_api_version=CallbackAPIVersion.VERSION2, **clientOptions + ) + + # This is not 100% reliable, hence the need to put the setup in the background + # See https://github.com/eclipse-paho/paho.mqtt.python/issues/890 + self.mqttClient.connect_timeout = 1.0 + + self.mqttClient.tls_set() + + self.mqttClient.will_set(**willOptions) + + self.mqttClient.username_pw_set("pythonClient", token) + + self.mqttClient.on_message = self._on_message + self.mqttClient.on_disconnect = self._on_disconnect + self.mqttClient.on_unsubscribe = self._on_unsubscribe + + try: + connect_error_code = self.mqttClient.connect(**connectOptions) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to connect with server (exception): {e}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on connection failure + return - # Stop the event loop - self.loop.stop() + if connect_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to connect with server (error code): {connect_error_code}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on connection failure + return self + + try: + subscribe_error_code, messageId = self.mqttClient.subscribe( + **subscribeOptions + ) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (exception): {e}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on subscription failure + return + + if subscribe_error_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Failed to subscribe to server (error code): {subscribe_error_code}" + ) + # Print with yellow color for consistency with warnings + yellow = "\033[0;33m" + reset = "\033[0m" + print( + f"{yellow}To disable Operator UI streaming, use Test(..., stream=False) in your script{reset}" + ) + self.stream = False # Disable streaming on subscription failure + return self + + self.mqttClient.loop_start() + + self.watcher = SimpleStationWatcher(self._send_update) + self.watcher.start() + + # Show connection status message with URL + try: + # Use ANSI escape sequence for clickable link + green = "\033[0;32m" + reset = "\033[0m" + + # Create clickable URL + clickable_url = ( + f"\033]8;;{operatorPage}\033\\{operatorPage}\033]8;;\033\\" + ) + + # Print single line connection message with URL + print(f"\n{green}Connected to TofuPilot real-time server{reset}") + print(f"{green}Access Operator UI: {clickable_url}{reset}\n") + except: + # Fallback for terminals that don't support ANSI + self._logger.success(f"Connected to TofuPilot real-time server") + self._logger.success(f"Access Operator UI: {operatorPage}") + + except Exception as e: + self._logger.warning(f"Operator UI: Setup error - {e}") + print( + "To disable Operator UI streaming, use Test(..., stream=False) in your script" + ) + self.stream = False # Disable streaming on any setup error + + def _send_update(self, message): + # Skip publishing if streaming is disabled or client is None + if not self.stream or self.mqttClient is None: + return + + try: + self.mqttClient.publish( + payload=json.dumps( + {"action": "send", "source": "python", "message": message} + ), + **self.publishOptions, + ) + except Exception as e: + self._logger.warning( + f"Operator UI: Failed to publish to server (exception): {e}" + ) + self.stream = False # Disable streaming on publish failure + return + + def _handle_answer(self, plug_name, method_name, args): + _, test_state = _get_executing_test() + + if test_state is None: + self._logger.warning("Operator UI: No running test found") + return + + # Find the plug matching `plug_name`. + plug = test_state.plug_manager.get_plug_by_class_path(plug_name) + if plug is None: + self._logger.warning(f"Operator UI: Plug not found - {plug_name}") + return + + method = getattr(plug, method_name, None) + + if not ( + plug.enable_remote + and isinstance(method, types.MethodType) + and not method_name.startswith("_") + and method_name not in plug.disable_remote_attrs + ): + self._logger.warning( + f"Operator UI: Method not found - {plug_name}.{method_name}" + ) + return + + try: + # side-effecting ! + method(*args) + except Exception as e: # pylint: disable=broad-except + self._logger.warning( + f"Operator UI: Method call failed - {method_name}({', '.join(args)}) - {e}" + ) + + def _final_update(self, testRecord: TestRecord): + """ + If the test is fast enough, the watcher never triggers, to avoid the UI being out of sync, + we force send at least once at the very end of the test + """ + + # Skip if streaming is disabled or MQTT client doesn't exist + if not self.stream or self.mqttClient is None: + return + + test_record_dict = testRecord.as_base_types() + + test_state_dict = { + "status": "COMPLETED", + "test_record": test_record_dict, + "plugs": {"plug_states": {}}, + "running_phase_state": {}, + } + + self._send_update(test_state_dict) + + # Operator UI-related callbacks + + def _on_message(self, client, userdata, message): + parsed = json.loads(message.payload) + + if parsed["source"] == "web": + self._handle_answer(**parsed["message"]) + + def _on_disconnect( + self, client, userdata, disconnect_flags, reason_code, properties + ): + if reason_code != mqtt.MQTT_ERR_SUCCESS: + self._logger.warning( + f"Operator UI: Unexpected disconnect (code {reason_code})" + ) + + def _on_unsubscribe(self, client, userdata, mid, reason_code_list, properties): + if any( + reason_code != mqtt.MQTT_ERR_SUCCESS for reason_code in reason_code_list + ): + self._logger.warning( + f"Operator UI: Partial disconnect (codes {reason_code_list})" + ) diff --git a/tofupilot/openhtf/upload.py b/tofupilot/openhtf/upload.py index 56737b6..c3e5f5b 100644 --- a/tofupilot/openhtf/upload.py +++ b/tofupilot/openhtf/upload.py @@ -15,6 +15,7 @@ from ..utils import ( notify_server, ) +from ..utils.logger import LoggerStateManager class upload: # pylint: disable=invalid-name @@ -63,111 +64,182 @@ def __init__( self._logger = self.client._logger self._url = self.client._url self._headers = self.client._headers - self._verify = verify + self._verify = verify # Kept for backward compatibility self._max_attachments = self.client._max_attachments self._max_file_size = self.client._max_file_size def __call__(self, test_record: TestRecord): - - # Extract relevant details from the test record - dut_id = test_record.dut_id - test_name = test_record.metadata.get("test_name") - - # Convert milliseconds to a datetime object - start_time = datetime.datetime.fromtimestamp( - test_record.start_time_millis / 1000.0 - ) - - # Format the timestamp as YYYY-MM-DD_HH_MM_SS_SSS - start_time_formatted = start_time.strftime("%Y-%m-%d_%H-%M-%S-%f")[:-3] - - temp_dir = tempfile.gettempdir() - - # Craft system-agnostic temporary filename - filename = os.path.join( - temp_dir, f"{dut_id}.{test_name}.{start_time_formatted}.json" - ) - - # Use the existing OutputToJSON callback to write to the custom file - output_callback = json_factory.OutputToJSON( - filename, - inline_attachments=False, # Exclude raw attachments - allow_nan=self.allow_nan, - indent=4, - ) - - # Open the custom file and write serialized test record to it - with open(filename, "w", encoding="utf-8") as file: - for json_line in output_callback.serialize_test_record(test_record): - file.write(json_line) + # Resume logger to ensure it's active during attachment processing + was_logger_resumed = False + if hasattr(self._logger, "resume"): + self._logger.resume() + was_logger_resumed = True try: - # Call create_run_from_report with the generated file path - run_id = self.client.upload_and_create_from_openhtf_report(filename) - finally: - # Ensure the file is deleted after processing - if os.path.exists(filename): - os.remove(filename) - - if run_id: + # Extract relevant details from the test record + dut_id = test_record.dut_id + test_name = test_record.metadata.get("test_name") + + # Convert milliseconds to a datetime object + start_time = datetime.datetime.fromtimestamp( + test_record.start_time_millis / 1000.0 + ) + + # Format the timestamp as YYYY-MM-DD_HH_MM_SS_SSS + start_time_formatted = start_time.strftime("%Y-%m-%d_%H-%M-%S-%f")[:-3] + + temp_dir = tempfile.gettempdir() + + # Craft system-agnostic temporary filename + filename = os.path.join( + temp_dir, f"{dut_id}.{test_name}.{start_time_formatted}.json" + ) + + # Use the existing OutputToJSON callback to write to the custom file + output_callback = json_factory.OutputToJSON( + filename, + inline_attachments=False, # Exclude raw attachments + allow_nan=self.allow_nan, + indent=4, + ) + + # Open the custom file and write serialized test record to it + with open(filename, "w", encoding="utf-8") as file: + for json_line in output_callback.serialize_test_record(test_record): + file.write(json_line) + + try: + # Call create_run_from_report with the generated file path + result = self.client.upload_and_create_from_openhtf_report(filename) + + # Extract run_id from response - it could be a string (id) or a dict (result with id field) + run_id = None + + if isinstance(result, dict): + # It's a dictionary response + if not result.get("success", True): + self._logger.error("Run creation failed, skipping attachments") + return + + # Try to get the ID from the dictionary + run_id = result.get("id") + else: + # Direct ID string + run_id = result + + # Final validation of run_id + if not run_id or not isinstance(run_id, str): + self._logger.error( + f"Invalid run ID received: {run_id}, skipping attachments" + ) + return + except Exception as e: + self._logger.error(f"Error creating run: {str(e)}") + return + finally: + # Ensure the file is deleted after processing + if os.path.exists(filename): + os.remove(filename) + + # Process attachments number_of_attachments = 0 - for phase in test_record.phases: + for phase_idx, phase in enumerate(test_record.phases): + # Count attachments silently + attachment_count = len(phase.attachments) + # Keep only max number of attachments if number_of_attachments >= self._max_attachments: self._logger.warning( - "Too many attachments, trimming to %d attachments.", - self._max_attachments, + f"Attachment limit ({self._max_attachments}) reached" ) break + + # Process each attachment in the phase for attachment_name, attachment in phase.attachments.items(): # Remove attachments that exceed the max file size if attachment.size > self._max_file_size: - self._logger.warning( - "File size exceeds the maximum allowed size of %d bytes: %s", - self._max_file_size, - attachment.name, - ) + self._logger.warning(f"File too large: {attachment_name}") continue if number_of_attachments >= self._max_attachments: break number_of_attachments += 1 - self._logger.info("Uploading %s...", attachment_name) + # Use LoggerStateManager to temporarily activate the logger + with LoggerStateManager(self._logger): + self._logger.info(f"Uploading attachment...") # Upload initialization initialize_url = f"{self._url}/uploads/initialize" payload = {"name": attachment_name} - response = requests.post( - initialize_url, - data=json.dumps(payload), - headers=self._headers, - verify=self._verify, - timeout=SECONDS_BEFORE_TIMEOUT, - ) - - response.raise_for_status() - response_json = response.json() - upload_url = response_json.get("uploadUrl") - upload_id = response_json.get("id") - - requests.put( - upload_url, - data=attachment.data, - headers={"Content-Type": attachment.mimetype}, - timeout=SECONDS_BEFORE_TIMEOUT, - ) + try: + response = requests.post( + initialize_url, + data=json.dumps(payload), + headers=self._headers, + verify=self._verify, + timeout=SECONDS_BEFORE_TIMEOUT, + ) - notify_server( - self._headers, - self._url, - upload_id, - run_id, - self._verify, - ) + response.raise_for_status() + response_json = response.json() + upload_url = response_json.get("uploadUrl") + upload_id = response_json.get("id") + + # Handle file attachments created with test.attach_from_file + try: + attachment_data = attachment.data + + # Some OpenHTF implementations have file path in the attachment object + if hasattr(attachment, "file_path") and getattr( + attachment, "file_path" + ): + try: + with open( + getattr(attachment, "file_path"), "rb" + ) as f: + attachment_data = f.read() + self._logger.info( + f"Read file data from {attachment.file_path}" + ) + except Exception as e: + self._logger.warning( + f"Could not read from file_path: {str(e)}" + ) + # Continue with attachment.data + + requests.put( + upload_url, + data=attachment_data, + headers={"Content-Type": attachment.mimetype}, + timeout=SECONDS_BEFORE_TIMEOUT, + ) + except Exception as e: + self._logger.error(f"Error uploading data: {str(e)}") + continue + + notify_server( + self._headers, + self._url, + upload_id, + run_id, + logger=self._logger, + ) - self._logger.success( - "Attachment %s successfully uploaded and linked to run.", - attachment_name, - ) + # Use LoggerStateManager to temporarily activate the logger + with LoggerStateManager(self._logger): + self._logger.success( + f"Uploaded attachment: {attachment_name}" + ) + except Exception as e: + # Use LoggerStateManager to temporarily activate the logger + with LoggerStateManager(self._logger): + self._logger.error( + f"Failed to process attachment: {str(e)}" + ) + continue + finally: + # For attachment logs to be visible, we intentionally don't pause the logger here + # Instead, we'll let the TofuPilot class's __exit__ method handle the logger state + pass diff --git a/tofupilot/utils/__init__.py b/tofupilot/utils/__init__.py index 7fecdd2..4038c17 100644 --- a/tofupilot/utils/__init__.py +++ b/tofupilot/utils/__init__.py @@ -1,10 +1,12 @@ -from .logger import setup_logger +from .logger import setup_logger, LoggerStateManager from .version_checker import check_latest_version from .files import ( validate_files, upload_file, notify_server, upload_attachments, + upload_attachment_data, + process_openhtf_attachments, log_and_raise, ) from .dates import ( @@ -17,15 +19,19 @@ handle_response, handle_http_error, handle_network_error, + api_request, ) __all__ = [ "setup_logger", + "LoggerStateManager", "check_latest_version", "validate_files", "upload_file", "notify_server", "upload_attachments", + "upload_attachment_data", + "process_openhtf_attachments", "parse_error_message", "timedelta_to_iso", "duration_to_iso", @@ -34,4 +40,5 @@ "handle_response", "handle_http_error", "handle_network_error", + "api_request", ] diff --git a/tofupilot/utils/files.py b/tofupilot/utils/files.py index 5dd3616..26cf4a1 100644 --- a/tofupilot/utils/files.py +++ b/tofupilot/utils/files.py @@ -3,10 +3,11 @@ from logging import Logger import os import sys -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Union import requests from ..constants.requests import SECONDS_BEFORE_TIMEOUT +from .logger import LoggerStateManager def log_and_raise(logger: Logger, error_message: str): @@ -47,7 +48,7 @@ def upload_file( url: str, file_path: str, verify: Optional[str] = None, -) -> bool: +) -> str: """Initializes an upload and stores file in it Args: @@ -72,6 +73,15 @@ def upload_file( verify=verify, ) + # Check for API key errors before raising for status + if response.status_code == 401: + error_data = response.json() + error_message = error_data.get("error", {}).get("message", "Authentication failed") + # Create a proper HTTPError with the response + http_error = requests.exceptions.HTTPError(response=response) + http_error.response = response + raise http_error + response.raise_for_status() response_json = response.json() upload_url = response_json.get("uploadUrl") @@ -85,11 +95,96 @@ def upload_file( data=file, headers={"Content-Type": content_type}, timeout=SECONDS_BEFORE_TIMEOUT, + verify=verify, ) return upload_id +def notify_server( + headers: dict, url: str, upload_id: str, run_id: str, logger=None, verify: str | None = None +) -> bool: + """Tells TP server to sync upload with newly created run""" + sync_url = f"{url}/uploads/sync" + sync_payload = {"upload_id": upload_id, "run_id": run_id} + + try: + response = requests.post( + sync_url, + data=json.dumps(sync_payload), + headers=headers, + timeout=SECONDS_BEFORE_TIMEOUT, + verify=verify, + ) + response.raise_for_status() + + return True + except Exception as e: + # If logger is available, log the error properly + if logger: + with LoggerStateManager(logger): + logger.error(f"Failed to sync attachment: {str(e)}") + return False + + +def upload_attachment_data( + logger: Logger, + headers: dict, + url: str, + name: str, + data, + mimetype: str, + run_id: str, + verify: str | None, +) -> bool: + """ + Uploads binary data as an attachment and links it to a run + + Uses LoggerStateManager to ensure proper logging, similar to OpenHTF implementation. + """ + try: + initialize_url = f"{url}/uploads/initialize" + payload = {"name": name} + + response = requests.post( + initialize_url, + data=json.dumps(payload), + headers=headers, + timeout=SECONDS_BEFORE_TIMEOUT, + verify=verify, + ) + response.raise_for_status() + + # Get upload details + response_json = response.json() + upload_url = response_json.get("uploadUrl") + upload_id = response_json.get("id") + + # Upload the actual data + content_type = mimetype or "application/octet-stream" + upload_response = requests.put( + upload_url, + data=data, + headers={"Content-Type": content_type}, + timeout=SECONDS_BEFORE_TIMEOUT, + verify=verify, + ) + upload_response.raise_for_status() + + # Link attachment to run + notify_server(headers, url, upload_id, run_id, logger) + + # Log success with LoggerStateManager for visibility + with LoggerStateManager(logger): + logger.success(f"Uploaded attachment: {name}") + return True + except Exception as e: + # Log error with LoggerStateManager for visibility + with LoggerStateManager(logger): + logger.error(f"Upload failed: {name} - {str(e)}") + return False + + def notify_server( headers: dict, url: str, @@ -127,11 +222,14 @@ def upload_attachments( logger: Logger, headers: dict, url: str, - paths: List[Dict[str, Optional[str]]], + paths: List[str], run_id: str, verify: Optional[str] = None, ): """Creates one upload per file and stores them into TofuPilot + + Uses LoggerStateManager to ensure logging is properly handled during the upload process, + similar to the OpenHTF implementation.s Args: logger (Logger): Logger instance @@ -141,12 +239,250 @@ def upload_attachments( run_id (str): ID of the run to link files to verify (Optional[str]): Path to a CA bundle file to verify the server certificate """ + # Print a visual separator before attachment uploads + print("") + for file_path in paths: - logger.info("Uploading %s...", file_path) + # Use LoggerStateManager to ensure logger is active for each file + with LoggerStateManager(logger): + logger.info(f"Uploading attachment: {file_path}") - upload_id = upload_file(headers, url, file_path, verify) - notify_server(headers, url, upload_id, run_id, verify) + try: + # Verify file exists + if not os.path.exists(file_path): + with LoggerStateManager(logger): + logger.error(f"File not found: {file_path}") + continue - logger.success( - f"Attachment {file_path} successfully uploaded and linked to run." - ) + # Open file and prepare for upload + with open(file_path, "rb") as file: + name = os.path.basename(file_path) + data = file.read() + mimetype, _ = ( + mimetypes.guess_type(file_path) or "application/octet-stream" + ) + + # Use shared upload function + upload_attachment_data( + logger, headers, url, name, data, mimetype, run_id, verify + ) + except Exception as e: + # Use LoggerStateManager to ensure error is visible + with LoggerStateManager(logger): + logger.error(f"Upload failed: {file_path} - {str(e)}") + continue + + +def process_openhtf_attachments( + logger: Logger, + headers: dict, + url: str, + test_record: Union[Dict, object], + run_id: str, + max_attachments: int, + max_file_size: int, + needs_base64_decode: bool = True, + verify: str | None = None, +) -> None: + """ + Process attachments from an OpenHTF test record and upload them. + + This function centralizes the attachment processing logic used in both the + direct TofuPilotClient.create_run_from_openhtf_report and the OpenHTF output callback. + + Uses LoggerStateManager to ensure proper logging visibility throughout the process, + similar to the OpenHTF implementation. + + Args: + logger: Logger for output messages + headers: HTTP headers for API authentication + url: Base API URL + test_record: OpenHTF test record (either as dict or object) + run_id: ID of the run to attach files to + max_attachments: Maximum number of attachments to process + max_file_size: Maximum size per attachment + needs_base64_decode: Whether attachment data is base64 encoded (true for dict format) + """ + # Print a visual separator + print("") + + # Use LoggerStateManager instead of directly resuming/pausing + with LoggerStateManager(logger): + logger.info("Processing attachments from test record") + + try: + attachment_count = 0 + + # Extract phases from test record based on type + if isinstance(test_record, dict): + phases = test_record.get("phases", []) + with LoggerStateManager(logger): + logger.info(f"Found {len(phases)} phases in JSON test record") + else: + phases = getattr(test_record, "phases", []) + with LoggerStateManager(logger): + logger.info(f"Found {len(phases)} phases in object test record") + + # Iterate through phases and their attachments + for i, phase in enumerate(phases): + # Skip if we've reached attachment limit + if attachment_count >= max_attachments: + with LoggerStateManager(logger): + logger.warning(f"Attachment limit ({max_attachments}) reached") + break + + # Get attachments based on record type + if isinstance(test_record, dict): + phase_attachments = phase.get("attachments", {}) + phase_name = phase.get("name", f"Phase {i}") + else: + phase_attachments = getattr(phase, "attachments", {}) + phase_name = getattr(phase, "name", f"Phase {i}") + + # Skip if phase has no attachments + if not phase_attachments: + continue + + with LoggerStateManager(logger): + logger.info( + f"Processing {len(phase_attachments)} attachments in {phase_name}" + ) + + # Process each attachment in the phase + for name, attachment in phase_attachments.items(): + # Skip if we've reached attachment limit + if attachment_count >= max_attachments: + break + + # Debug attachment details (using debug level to avoid cluttering the console) + if isinstance(test_record, dict): + with LoggerStateManager(logger): + logger.debug(f"Attachment: {name}, Type: JSON format") + else: + attrs = [ + attr for attr in dir(attachment) if not attr.startswith("_") + ] + with LoggerStateManager(logger): + logger.debug( + f"Attachment: {name}, Type: Object, Attributes: {attrs}" + ) + + # Get attachment data and size based on record type + if isinstance(test_record, dict): + # Dict format (from JSON file) + attachment_data = attachment.get("data", "") + if not attachment_data: + with LoggerStateManager(logger): + logger.warning(f"No data in: {name}") + continue + + try: + if needs_base64_decode: + import base64 + + data = base64.b64decode(attachment_data) + else: + data = attachment_data + + attachment_size = len(data) + mimetype = attachment.get( + "mimetype", "application/octet-stream" + ) + except Exception as e: + with LoggerStateManager(logger): + logger.error( + f"Failed to process attachment data: {name} - {str(e)}" + ) + continue + else: + # Object format (from callback) + attachment_data = getattr(attachment, "data", None) + + # Handle different attachment types in OpenHTF + if attachment_data is None: + with LoggerStateManager(logger): + logger.warning(f"No data in: {name}") + continue + + # Handle file-based attachments in different formats + data = None + + # Option 1: Check for direct file_path attribute + if hasattr(attachment, "file_path") and getattr( + attachment, "file_path" + ): + try: + file_path = getattr(attachment, "file_path") + with LoggerStateManager(logger): + logger.info(f"Found file_path attribute: {file_path}") + with open(file_path, "rb") as f: + data = f.read() + except Exception as e: + with LoggerStateManager(logger): + logger.error(f"Failed to read from file_path: {str(e)}") + + # Option 2: Check for filename attribute (used in some OpenHTF versions) + elif hasattr(attachment, "filename") and getattr( + attachment, "filename" + ): + try: + file_path = getattr(attachment, "filename") + with LoggerStateManager(logger): + logger.info(f"Found filename attribute: {file_path}") + with open(file_path, "rb") as f: + data = f.read() + except Exception as e: + with LoggerStateManager(logger): + logger.error(f"Failed to read from filename: {str(e)}") + + # Option 3: Use the data attribute directly + else: + with LoggerStateManager(logger): + logger.info("Using data attribute directly") + data = attachment_data + + # Verify we have valid data + if data is None: + with LoggerStateManager(logger): + logger.error(f"No valid data found for attachment: {name}") + continue + + # Get size from attribute or calculate it + attachment_size = getattr(attachment, "size", len(data)) + mimetype = getattr( + attachment, "mimetype", "application/octet-stream" + ) + + # Skip oversized attachments + if attachment_size > max_file_size: + with LoggerStateManager(logger): + logger.warning(f"File too large: {name}") + continue + + # Increment counter and process the attachment + attachment_count += 1 + + # Use unified attachment upload function - logging is handled inside this function + try: + success = upload_attachment_data( + logger, + headers, + url, + name, + data, + mimetype, + run_id, + verify + ) + + # Don't log success/failure here as it's already logged in upload_attachment_data + except Exception as e: + with LoggerStateManager(logger): + logger.error( + f"Exception during attachment upload: {name} - {str(e)}" + ) + # Continue with other attachments regardless of success/failure + finally: + # We intentionally don't pause the logger here, as in the OpenHTF implementation + # This allows any final log messages to be visible + pass diff --git a/tofupilot/utils/logger.py b/tofupilot/utils/logger.py index 4cc2efd..2cc5294 100644 --- a/tofupilot/utils/logger.py +++ b/tofupilot/utils/logger.py @@ -1,5 +1,10 @@ import logging import sys +import os +import time +import threading +import queue +from datetime import datetime # Define a custom log level for success messages SUCCESS_LEVEL_NUM = 25 @@ -7,7 +12,7 @@ def success(self, message, *args, **kws): - """Log a success message.""" + """Log success at custom level.""" if self.isEnabledFor(SUCCESS_LEVEL_NUM): self._log(SUCCESS_LEVEL_NUM, message, args, **kws) @@ -15,41 +20,197 @@ def success(self, message, *args, **kws): logging.Logger.success = success +def add_pause_methods_to_logger(logger): + """Add pause/resume functionality to a logger""" + original_handlers = list(logger.handlers) + + def pause(): + """Temporarily disable all handlers""" + for handler in logger.handlers: + logger.removeHandler(handler) + + def resume(): + """Re-enable handlers""" + for handler in original_handlers: + if handler not in logger.handlers: + logger.addHandler(handler) + + # Add methods to logger + logger.pause = pause + logger.resume = resume + + return logger + + +class LoggerStateManager: + """Context manager for temporarily ensuring logger is active""" + + def __init__(self, logger): + self.logger = logger + self.was_resumed = False + + def __enter__(self): + # Ensure logger is active for this block + if hasattr(self.logger, 'resume'): + self.logger.resume() + self.was_resumed = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # If we resumed the logger, restore to paused state + if self.was_resumed and hasattr(self.logger, 'pause'): + self.logger.pause() + + +class TofupilotFormatter(logging.Formatter): + """Minimal formatter with colors and no timestamp.""" + + # ANSI color codes + RESET = "\033[0m" + BLUE = "\033[0;34m" + GREEN = "\033[0;32m" + YELLOW = "\033[0;33m" + RED = "\033[0;31m" + RED_BG = "\033[1;41m" + GRAY = "\033[0;37m" + BOLD = "\033[1m" + + # Log level name mapping + LEVEL_NAMES = { + logging.DEBUG: "DBG", + logging.INFO: "INF", + logging.WARNING: "WRN", + logging.ERROR: "ERR", + logging.CRITICAL: "CRT", + SUCCESS_LEVEL_NUM: "OK!", + } + + # Color mapping for levels + LEVEL_COLORS = { + logging.DEBUG: GRAY, + logging.INFO: BLUE, + logging.WARNING: YELLOW, + logging.ERROR: RED, + logging.CRITICAL: RED_BG, + SUCCESS_LEVEL_NUM: GREEN, + } + + def __init__(self): + """Initialize formatter.""" + super().__init__() + + def format(self, record): + """Format log with minimal prefix and colors.""" + # Get log level color and short name + level_color = self.LEVEL_COLORS.get(record.levelno, self.RESET) + level_name = self.LEVEL_NAMES.get(record.levelno, "???") + + # Create minimal prefix with no timestamp + prefix = f"{level_color}{self.BOLD}TP{self.RESET}{level_color}:{level_name} " + + # Add log message with color + message = record.getMessage() + formatted_message = f"{prefix}{message}{self.RESET}" + + # Add exception info if present + if record.exc_info: + exc_text = self.formatException(record.exc_info) + formatted_message += f"\n{exc_text}" + + return formatted_message + + +# Simple formatter for backward compatibility class CustomFormatter(logging.Formatter): - """Custom formatter to add symbols and colors to the log messages.""" + """Simple formatter with no timestamp.""" reset_code = "\033[0m" format_dict = { - logging.DEBUG: "\033[0;37m%(asctime)s - DEBUG: %(message)s" - + reset_code, # White - logging.INFO: "\033[0;34m%(asctime)s - ℹ️ %(message)s" - + reset_code, # Blue with info symbol - logging.WARNING: "\033[0;33m%(asctime)s - ⚠️ %(message)s" - + reset_code, # Yellow with warning symbol - logging.ERROR: "\033[0;31m%(asctime)s - ❌ %(message)s" - + reset_code, # Red with cross mark - logging.CRITICAL: "\033[1;41m%(asctime)s - 🚨 %(message)s" - + reset_code, # White on red background with alarm symbol - SUCCESS_LEVEL_NUM: "\033[0;32m%(asctime)s - ✅ %(message)s" - + reset_code, # Green with checkmark + logging.DEBUG: "\033[0;37mDEBUG: %(message)s" + reset_code, + logging.INFO: "\033[0;34mINFO: %(message)s" + reset_code, + logging.WARNING: "\033[0;33mWARN: %(message)s" + reset_code, + logging.ERROR: "\033[0;31mERROR: %(message)s" + reset_code, + logging.CRITICAL: "\033[1;41mCRIT: %(message)s" + reset_code, + SUCCESS_LEVEL_NUM: "\033[0;32mSUCCESS: %(message)s" + reset_code, } def format(self, record): - """Format the specified record as text.""" + """Format record with minimal prefix.""" log_fmt = self.format_dict.get(record.levelno, self._fmt) - formatter = logging.Formatter(log_fmt, datefmt="%Y-%m-%d %H:%M:%S") + formatter = logging.Formatter(log_fmt) return formatter.format(record) -def setup_logger(log_level: int): - """Set up the logger with a custom formatter and stream handler.""" - logger = logging.getLogger(__name__) +class LogLevelFilter(logging.Filter): + """Sets log level from environment variable.""" + + def __init__(self, env_var='TOFUPILOT_LOG_LEVEL'): + """Init with env var name.""" + super().__init__() + self.env_var = env_var + self.level = self._get_level_from_env() + + def _get_level_from_env(self): + """Get level from env var.""" + level_str = os.environ.get(self.env_var, 'INFO').upper() + level_map = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'SUCCESS': SUCCESS_LEVEL_NUM, + 'WARNING': logging.WARNING, + 'WARN': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL + } + return level_map.get(level_str, logging.INFO) + + def filter(self, record): + """Apply dynamic level filtering.""" + # Reload level from env for each record to allow runtime changes + self.level = self._get_level_from_env() + return record.levelno >= self.level + + +def setup_logger(log_level=None, advanced_format=True): + """Configure logger with minimal formatting and color support. + + Args: + log_level: Override env var TOFUPILOT_LOG_LEVEL + advanced_format: Use TofupilotFormatter (default) or CustomFormatter + + Returns: + Configured logger instance + """ + # Maintain backward compatibility with __name__ + logger_name = "tofupilot" + logger = logging.getLogger(logger_name) + + # Clear existing handlers + if logger.handlers: + logger.handlers.clear() + + # Set level from arg or environment + level_filter = LogLevelFilter() + log_level = log_level or level_filter.level logger.setLevel(log_level) + + # Create handler handler = logging.StreamHandler(sys.stdout) handler.setLevel(log_level) - handler.setFormatter(CustomFormatter()) - if not logger.handlers: - logger.addHandler(handler) - - return logger + + # Choose formatter based on preference + if advanced_format: + handler.setFormatter(TofupilotFormatter()) + else: + handler.setFormatter(CustomFormatter()) + + handler.addFilter(level_filter) + + # Add handler to logger + logger.addHandler(handler) + + # Add pause/resume functionality + logger = add_pause_methods_to_logger(logger) + + return logger \ No newline at end of file diff --git a/tofupilot/utils/network.py b/tofupilot/utils/network.py index 5adae43..67378a4 100644 --- a/tofupilot/utils/network.py +++ b/tofupilot/utils/network.py @@ -1,9 +1,11 @@ from typing import Dict, List, Optional, Any import requests +from ..constants.requests import SECONDS_BEFORE_TIMEOUT def parse_error_message(response: requests.Response) -> str: + """Extract error message from response""" try: error_data = response.json() return error_data.get("error", {}).get( @@ -13,6 +15,31 @@ def parse_error_message(response: requests.Response) -> str: return f"HTTP error occurred: {response.text}" +def api_request( + logger, method: str, url: str, headers: Dict, + data: Optional[Dict] = None, + params: Optional[Dict] = None, + timeout: int = SECONDS_BEFORE_TIMEOUT, + verify: str | None = None, +) -> Dict: + """Unified API request handler with consistent error handling""" + try: + response = requests.request( + method, url, + json=data, + headers=headers, + params=params, + timeout=timeout, + verify=verify, + ) + response.raise_for_status() + return handle_response(logger, response) + except requests.exceptions.HTTPError as http_err: + return handle_http_error(logger, http_err) + except requests.RequestException as e: + return handle_network_error(logger, e) + + def handle_response( logger, response: requests.Response, @@ -23,16 +50,42 @@ def handle_response( """ data = response.json() - # Logging warnings if present - warnings: Optional[List[str]] = data.get("warnings") - if warnings is not None: - for warning in warnings: - logger.warning(warning) - - # Logging success message if the JSON has one - message = data.get("message") - if message is not None: - logger.success(message) + # Ensure logger is active to process messages + was_resumed = False + if hasattr(logger, 'resume'): + logger.resume() + was_resumed = True + + try: + # Process warnings + warnings: Optional[List[str]] = data.get("warnings") + if warnings is not None: + for warning in warnings: + logger.warning(warning) + + # Process errors + errors = data.get("errors") or data.get("error") + if errors: + # Handle both array and single object formats + if isinstance(errors, list): + for error in errors: + if isinstance(error, dict) and "message" in error: + logger.error(error["message"]) + else: + logger.error(str(error)) + elif isinstance(errors, dict) and "message" in errors: + logger.error(errors["message"]) + elif isinstance(errors, str): + logger.error(errors) + + # Process success message + message = data.get("message") + if message is not None: + logger.success(message) + finally: + # Restore logger state if needed + if was_resumed and hasattr(logger, 'pause'): + logger.pause() # Returning the parsed JSON to the caller return data @@ -42,27 +95,42 @@ def handle_http_error( logger, http_err: requests.exceptions.HTTPError ) -> Dict[str, Any]: """Handles HTTP errors and logs them.""" - - warnings = None # Initialize warnings to None - - # Check if the response body is not empty and Content-Type is application/json - if ( - http_err.response.text.strip() - and http_err.response.headers.get("Content-Type") == "application/json" - ): - # Parse JSON safely - response_json = http_err.response.json() - warnings = response_json.get("warnings") - if warnings is not None: - for warning in warnings: - logger.warning(warning) - error_message = parse_error_message(http_err.response) - else: - # Handle cases where response is empty or non-JSON - error_message = http_err - - logger.error(error_message) - + warnings = None + + # Ensure logger is active to process messages + was_resumed = False + if hasattr(logger, 'resume'): + logger.resume() + was_resumed = True + + try: + # Extract error details from JSON response when available + if (http_err.response.text.strip() and + http_err.response.headers.get("Content-Type") == "application/json"): + try: + response_json = http_err.response.json() + + # Process warnings if present + warnings = response_json.get("warnings") + if warnings: + for warning in warnings: + logger.warning(warning) + + # Get error message + error_message = parse_error_message(http_err.response) + except ValueError: + error_message = str(http_err) + else: + error_message = str(http_err) + + # Log the error through the logger for proper formatting + logger.error(error_message) + finally: + # Restore logger state if needed + if was_resumed and hasattr(logger, 'pause'): + logger.pause() + + # Return structured error info return { "success": False, "message": None, @@ -74,7 +142,27 @@ def handle_http_error( def handle_network_error(logger, e: requests.RequestException) -> Dict[str, Any]: """Handles network errors and logs them.""" - logger.error(f"Network error: {e}") + # Ensure logger is active to process messages + was_resumed = False + if hasattr(logger, 'resume'): + logger.resume() + was_resumed = True + + try: + error_message = f"Network error: {str(e)}" + logger.error(error_message) + + # Provide SSL-specific guidance + if isinstance(e, requests.exceptions.SSLError) or "SSL" in str(e) or "certificate verify failed" in str(e): + logger.warning("SSL certificate verification error detected") + logger.warning("This is typically caused by missing or invalid SSL certificates") + logger.warning("Try: 1) pip install certifi 2) /Applications/Python*/Install Certificates.command") + finally: + # Restore logger state if needed + if was_resumed and hasattr(logger, 'pause'): + logger.pause() + + # Return structured error info return { "success": False, "message": None, diff --git a/tofupilot/utils/version_checker.py b/tofupilot/utils/version_checker.py index b017acc..b823519 100644 --- a/tofupilot/utils/version_checker.py +++ b/tofupilot/utils/version_checker.py @@ -22,7 +22,7 @@ def check_latest_version(logger, current_version, package_name: str): ) logger.warning(warning_message) except PackageNotFoundError: - logger.info(f"Package {package_name} is not installed.") + logger.info(f"Package not installed: {package_name}") except requests.RequestException as e: - logger.warning(f"Error checking the latest version: {e}") + logger.warning(f"Version check failed: {e}")