diff --git a/EXTERNAL_SESSION.md b/EXTERNAL_SESSION.md new file mode 100644 index 0000000..f853b27 --- /dev/null +++ b/EXTERNAL_SESSION.md @@ -0,0 +1,141 @@ +# External Session Management + +## Overview + +The `python-openevse-http` library now supports passing an external `aiohttp.ClientSession` to the `OpenEVSE` class. This allows you to manage the session lifecycle yourself and share sessions across multiple API clients. + +## Benefits + +- **Session Reuse**: Share a single session across multiple OpenEVSE instances or other aiohttp-based clients +- **Custom Configuration**: Configure session settings like timeouts, connectors, and SSL verification +- **Resource Management**: Better control over connection pooling and resource cleanup +- **Integration**: Easier integration with existing applications that already manage aiohttp sessions + +## Usage + +### With External Session + +```python +import aiohttp +from openevsehttp import OpenEVSE + +async def main(): + # Create your own session with custom settings + timeout = aiohttp.ClientTimeout(total=30) + async with aiohttp.ClientSession(timeout=timeout) as session: + # Pass the session to OpenEVSE + charger = OpenEVSE("openevse.local", session=session) + + # Use the charger normally + await charger.update() + print(f"Status: {charger.status}") + + # Clean up + await charger.ws_disconnect() + # Session will be closed by the context manager +``` + +### Without External Session (Backward Compatible) + +```python +from openevsehttp import OpenEVSE + +async def main(): + # The library creates and manages its own sessions + charger = OpenEVSE("openevse.local") + + # Use the charger normally + await charger.update() + print(f"Status: {charger.status}") + + await charger.ws_disconnect() +``` + +### Sharing a Session + +```python +import aiohttp +from openevsehttp import OpenEVSE + +async def main(): + async with aiohttp.ClientSession() as session: + # Use the same session for multiple chargers + charger1 = OpenEVSE("charger1.local", session=session) + charger2 = OpenEVSE("charger2.local", session=session) + + # Both chargers use the same session + await charger1.update() + await charger2.update() + + await charger1.ws_disconnect() + await charger2.ws_disconnect() +``` + +## API Changes + +### `OpenEVSE.__init__()` + +```python +def __init__( + self, + host: str, + user: str = "", + pwd: str = "", + session: aiohttp.ClientSession | None = None, +) -> None: +``` + +**Parameters:** +- `host` (str): The hostname or IP address of the OpenEVSE charger +- `user` (str, optional): Username for authentication +- `pwd` (str, optional): Password for authentication +- `session` (aiohttp.ClientSession | None, optional): External session to use for HTTP requests. If not provided, the library will create temporary sessions as needed. + +### `OpenEVSEWebsocket.__init__()` + +```python +def __init__( + self, + server, + callback, + user=None, + password=None, + session: aiohttp.ClientSession | None = None, +): +``` + +**Parameters:** +- `server`: The server URL +- `callback`: Callback function for websocket events +- `user` (optional): Username for authentication +- `password` (optional): Password for authentication +- `session` (aiohttp.ClientSession | None, optional): External session to use for websocket connections. If not provided, a new session will be created. + +## Important Notes + +1. **Session Lifecycle**: When you provide an external session, you are responsible for closing it. The library will NOT close externally provided sessions. + +2. **Backward Compatibility**: This change is fully backward compatible. Existing code that doesn't provide a session will continue to work exactly as before. + +3. **Websocket Sessions**: The websocket connection will also use the provided session, ensuring consistent session management across all HTTP and WebSocket operations. + +4. **Thread Safety**: If you're using the same session across multiple OpenEVSE instances, ensure you're following aiohttp's thread safety guidelines. + +## Migration Guide + +If you want to migrate existing code to use external sessions: + +**Before:** +```python +charger = OpenEVSE("openevse.local") +await charger.update() +``` + +**After:** +```python +async with aiohttp.ClientSession() as session: + charger = OpenEVSE("openevse.local", session=session) + await charger.update() +``` + +No other changes are required! diff --git a/example_external_session.py b/example_external_session.py new file mode 100644 index 0000000..e426aff --- /dev/null +++ b/example_external_session.py @@ -0,0 +1,67 @@ +"""Example of using python-openevse-http with an external aiohttp.ClientSession. + +This demonstrates how to pass your own session to the library, which is useful when: +- You want to manage the session lifecycle yourself +- You need to share a session across multiple API clients +- You want to configure custom session settings (timeouts, connectors, etc.) +""" + +import asyncio + +import aiohttp + +from openevsehttp.__main__ import OpenEVSE + + +async def example_with_external_session(): + """Example using an external session.""" + # Create your own session with custom settings + timeout = aiohttp.ClientTimeout(total=30) + async with aiohttp.ClientSession(timeout=timeout) as session: + # Pass the session to OpenEVSE + charger = OpenEVSE("openevse.local", session=session) + + # Use the charger normally + await charger.update() + print(f"Status: {charger.status}") + print(f"Current: {charger.charging_current}A") + + # The session will be closed when the context manager exits + # but OpenEVSE won't close it (since it's externally managed) + await charger.ws_disconnect() + + +async def example_without_external_session(): + """Example without external session (backward compatible).""" + # The library will create and manage its own sessions + charger = OpenEVSE("openevse.local") + + # Use the charger normally + await charger.update() + print(f"Status: {charger.status}") + print(f"Current: {charger.charging_current}A") + + await charger.ws_disconnect() + + +async def example_shared_session(): + """Example sharing a session between multiple clients.""" + async with aiohttp.ClientSession() as session: + # Use the same session for multiple chargers + charger1 = OpenEVSE("charger1.local", session=session) + charger2 = OpenEVSE("charger2.local", session=session) + + # Both chargers use the same session + await charger1.update() + await charger2.update() + + print(f"Charger 1 Status: {charger1.status}") + print(f"Charger 2 Status: {charger2.status}") + + await charger1.ws_disconnect() + await charger2.ws_disconnect() + + +if __name__ == "__main__": + # Run one of the examples + asyncio.run(example_with_external_session()) diff --git a/openevsehttp/__main__.py b/openevsehttp/__main__.py index 9fee51c..356049e 100644 --- a/openevsehttp/__main__.py +++ b/openevsehttp/__main__.py @@ -3,10 +3,10 @@ from __future__ import annotations import asyncio -from datetime import datetime, timedelta, timezone import json import logging import re +from datetime import datetime, timedelta, timezone from typing import Any, Callable, Dict, Union import aiohttp # type: ignore @@ -84,7 +84,13 @@ class OpenEVSE: """Represent an OpenEVSE charger.""" - def __init__(self, host: str, user: str = "", pwd: str = "") -> None: + def __init__( + self, + host: str, + user: str = "", + pwd: str = "", + session: aiohttp.ClientSession | None = None, + ) -> None: """Connect to an OpenEVSE charger equipped with wifi or ethernet.""" self._user = user self._pwd = pwd @@ -97,6 +103,8 @@ def __init__(self, host: str, user: str = "", pwd: str = "") -> None: self.callback: Callable | None = None self._loop = None self.tasks = None + self._session = session + self._session_external = session is not None async def process_request( self, @@ -113,7 +121,9 @@ async def process_request( if self._user and self._pwd: auth = aiohttp.BasicAuth(self._user, self._pwd) - async with aiohttp.ClientSession() as session: + # Use provided session or create a temporary one + if self._session is not None: + session = self._session http_method = getattr(session, method) _LOGGER.debug( "Connecting to %s with data: %s rapi: %s using method %s", @@ -165,9 +175,59 @@ async def process_request( except ContentTypeError as err: _LOGGER.error("Content error: %s", err.message) raise err - - await session.close() - return message + else: + async with aiohttp.ClientSession() as session: + http_method = getattr(session, method) + _LOGGER.debug( + "Connecting to %s with data: %s rapi: %s using method %s", + url, + data, + rapi, + method, + ) + try: + async with http_method( + url, + data=rapi, + json=data, + auth=auth, + ) as resp: + try: + message = await resp.text() + except UnicodeDecodeError: + _LOGGER.debug("Decoding error") + message = await resp.read() + message = message.decode(errors="replace") + + try: + message = json.loads(message) + except ValueError: + _LOGGER.warning("Non JSON response: %s", message) + + if resp.status == 400: + index = "" + if "msg" in message.keys(): + index = "msg" + elif "error" in message.keys(): + index = "error" + _LOGGER.error("Error 400: %s", message[index]) + raise ParseJSONError + if resp.status == 401: + _LOGGER.error("Authentication error: %s", message) + raise AuthenticationError + if resp.status in [404, 405, 500]: + _LOGGER.warning("%s", message) + + if method == "post" and "config_version" in message: + await self.update() + return message + + except (TimeoutError, ServerTimeoutError) as err: + _LOGGER.error("%s: %s", ERROR_TIMEOUT, url) + raise err + except ContentTypeError as err: + _LOGGER.error("Content error: %s", err.message) + raise err async def send_command(self, command: str) -> tuple: """Send a RAPI command to the charger and parses the response.""" @@ -204,7 +264,7 @@ async def update(self) -> None: if not self.websocket: # Start Websocket listening self.websocket = OpenEVSEWebsocket( - self.url, self._update_status, self._user, self._pwd + self.url, self._update_status, self._user, self._pwd, self._session ) async def test_and_get(self) -> dict: @@ -573,7 +633,8 @@ async def firmware_check(self) -> dict | None: return None try: - async with aiohttp.ClientSession() as session: + if self._session: + session = self._session http_method = getattr(session, method) _LOGGER.debug( "Connecting to %s using method %s", @@ -590,6 +651,24 @@ async def firmware_check(self) -> dict | None: response["release_notes"] = message["body"] response["release_url"] = message["html_url"] return response + else: + async with aiohttp.ClientSession() as session: + http_method = getattr(session, method) + _LOGGER.debug( + "Connecting to %s using method %s", + url, + method, + ) + async with http_method(url) as resp: + if resp.status != 200: + return None + message = await resp.text() + message = json.loads(message) + response = {} + response["latest_version"] = message["tag_name"] + response["release_notes"] = message["body"] + response["release_url"] = message["html_url"] + return response except (TimeoutError, ServerTimeoutError): _LOGGER.error("%s: %s", ERROR_TIMEOUT, url) diff --git a/openevsehttp/websocket.py b/openevsehttp/websocket.py index 3fc83ae..0788eef 100644 --- a/openevsehttp/websocket.py +++ b/openevsehttp/websocket.py @@ -31,9 +31,11 @@ def __init__( callback, user=None, password=None, + session: aiohttp.ClientSession | None = None, ): """Initialize a OpenEVSEWebsocket instance.""" - self.session = aiohttp.ClientSession() + self.session = session if session is not None else aiohttp.ClientSession() + self._session_external = session is not None self.uri = self._get_uri(server) self._user = user self._password = password @@ -159,7 +161,9 @@ async def listen(self): async def close(self): """Close the listening websocket.""" await self._set_state(STATE_STOPPED) - await self.session.close() + # Only close the session if we created it + if not self._session_external: + await self.session.close() async def keepalive(self): """Send ping requests to websocket.""" diff --git a/pylintrc b/pylintrc index 00b41c8..e7646dc 100644 --- a/pylintrc +++ b/pylintrc @@ -27,7 +27,8 @@ disable= too-many-branches, too-many-statements, too-many-lines, - too-many-positional-arguments + too-many-positional-arguments, + too-many-return-statements [REPORTS] score=no diff --git a/tests/test_external_session.py b/tests/test_external_session.py new file mode 100644 index 0000000..7da7e9d --- /dev/null +++ b/tests/test_external_session.py @@ -0,0 +1,200 @@ +"""Test external session management.""" + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import aiohttp +import pytest + +import openevsehttp.__main__ as main +from openevsehttp.__main__ import OpenEVSE +from tests.common import load_fixture + +pytestmark = pytest.mark.asyncio + +TEST_URL_STATUS = "http://openevse.test.tld/status" +TEST_URL_CONFIG = "http://openevse.test.tld/config" +TEST_TLD = "openevse.test.tld" + + +async def test_external_session_provided(): + """Test that an external session is used when provided.""" + # Create a mock session + mock_session = MagicMock(spec=aiohttp.ClientSession) + mock_session.closed = False + + # Mock the response + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.text = AsyncMock(return_value=load_fixture("v4_json/status.json")) + + # Mock the get method to return the response + mock_get = AsyncMock(return_value=mock_response) + mock_get.__aenter__ = AsyncMock(return_value=mock_response) + mock_get.__aexit__ = AsyncMock(return_value=None) + mock_session.get = MagicMock(return_value=mock_get) + + # Create OpenEVSE instance with external session + charger = OpenEVSE(TEST_TLD, session=mock_session) + + # Verify the session is stored + assert charger._session is mock_session + assert charger._session_external is True + + # Make a request + await charger.process_request(TEST_URL_STATUS, method="get") + + # Verify the external session was used + mock_session.get.assert_called_once() + + +async def test_no_external_session(mock_aioclient): + """Test that a temporary session is created when none is provided.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + + # Create OpenEVSE instance without external session + charger = OpenEVSE(TEST_TLD) + + # Verify no session is stored + assert charger._session is None + assert charger._session_external is False + + # Make a request - should create a temporary session + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_external_session_with_update(mock_aioclient): + """Test that external session is used during update.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body=load_fixture("v4_json/config.json"), + ) + + # Create a real session for testing + async with aiohttp.ClientSession() as session: + # Create OpenEVSE instance with external session + charger = OpenEVSE(TEST_TLD, session=session) + + # Verify the session is stored + assert charger._session is session + assert charger._session_external is True + + # Update should use the external session + await charger.update() + + # Verify status was updated + assert charger._status is not None + assert charger._config is not None + + +async def test_websocket_uses_external_session(mock_aioclient): + """Test that websocket uses the external session.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body=load_fixture("v4_json/config.json"), + ) + + # Create a real session for testing + async with aiohttp.ClientSession() as session: + # Create OpenEVSE instance with external session + charger = OpenEVSE(TEST_TLD, session=session) + + # Update to initialize websocket + await charger.update() + + # Verify websocket was created with the session + assert charger.websocket is not None + assert charger.websocket.session is session + assert charger.websocket._session_external is True + + # Cleanup + await charger.ws_disconnect() + + +async def test_firmware_check_with_external_session(mock_aioclient): + """Test that firmware_check uses external session.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body=load_fixture("v4_json/config.json"), + ) + + github_response = { + "tag_name": "v4.2.0", + "body": "Release notes", + "html_url": "https://github.com/OpenEVSE/ESP32_WiFi_V4.x/releases/tag/v4.2.0", + } + + mock_aioclient.get( + "https://api.github.com/repos/OpenEVSE/ESP32_WiFi_V4.x/releases/latest", + status=200, + body=json.dumps(github_response), + ) + + # Create OpenEVSE instance without external session (use mocked responses) + charger = OpenEVSE(TEST_TLD) + + # Load config first + await charger.update() + + # Check firmware - should use mocked session + result = await charger.firmware_check() + + # Verify result + assert result is not None + assert result["latest_version"] == "v4.2.0" + + +async def test_session_not_closed_when_external(mock_aioclient): + """Test that external session is not closed by the library.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body=load_fixture("v4_json/config.json"), + ) + + # Create a real session + session = aiohttp.ClientSession() + + try: + # Create OpenEVSE instance with external session + charger = OpenEVSE(TEST_TLD, session=session) + + # Update to initialize websocket + await charger.update() + + # Disconnect websocket + await charger.ws_disconnect() + + # Session should still be open (not closed by library) + assert not session.closed + + finally: + # Clean up the session ourselves + await session.close() diff --git a/tests/test_main.py b/tests/test_main.py index e807beb..a5923c8 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,27 +1,38 @@ """Library tests.""" +import asyncio import json import logging +from datetime import datetime, timedelta, timezone from unittest import mock -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch import aiohttp import pytest -from datetime import datetime, timezone, timedelta -from freezegun import freeze_time from aiohttp.client_exceptions import ContentTypeError, ServerTimeoutError from aiohttp.client_reqrep import ConnectionKey +from awesomeversion import AwesomeVersion +from awesomeversion.exceptions import AwesomeVersionCompareException +from freezegun import freeze_time import openevsehttp.__main__ as main from openevsehttp.__main__ import OpenEVSE from openevsehttp.exceptions import ( + AlreadyListening, + AuthenticationError, InvalidType, + MissingMethod, MissingSerial, + ParseJSONError, UnknownError, UnsupportedFeature, ) from openevsehttp.websocket import ( + SIGNAL_CONNECTION_STATE, + STATE_CONNECTED, STATE_DISCONNECTED, + STATE_STOPPED, + OpenEVSEWebsocket, ) from tests.common import load_fixture @@ -2333,3 +2344,876 @@ async def test_power(fixture, expected, request): assert charger.current_power == expected await charger.ws_disconnect() + + +# Additional coverage tests for error handling and edge cases + + +async def test_process_request_missing_method(): + """Test process_request raises error when method is None.""" + + charger = OpenEVSE(SERVER_URL) + + with pytest.raises(MissingMethod): + await charger.process_request(TEST_URL_STATUS, method=None) + + +async def test_process_request_unicode_decode_error(mock_aioclient): + """Test process_request handles UnicodeDecodeError.""" + # Create a mock response that raises UnicodeDecodeError on text() + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.text = AsyncMock( + side_effect=UnicodeDecodeError("utf-8", b"", 0, 1, "") + ) + mock_response.read = AsyncMock(return_value=b'{"status": "ok"}') + + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body='{"status": "ok"}', + ) + + # Patch the session.get to return our mock response + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + charger = OpenEVSE(SERVER_URL) + result = await charger.process_request(TEST_URL_STATUS, method="get") + + assert result == {"status": "ok"} + + +async def test_process_request_non_json_response(mock_aioclient): + """Test process_request handles non-JSON response.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body="Not JSON", + ) + + charger = OpenEVSE(SERVER_URL) + result = await charger.process_request(TEST_URL_STATUS, method="get") + + # Should return the string as-is + assert result == "Not JSON" + + +async def test_process_request_400_error_with_msg(mock_aioclient): + """Test process_request handles 400 error with msg field.""" + + mock_aioclient.get( + TEST_URL_STATUS, + status=400, + body='{"msg": "Bad request"}', + ) + + charger = OpenEVSE(SERVER_URL) + + with pytest.raises(ParseJSONError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_process_request_400_error_with_error_field(mock_aioclient): + """Test process_request handles 400 error with error field.""" + + mock_aioclient.get( + TEST_URL_STATUS, + status=400, + body='{"error": "Invalid input"}', + ) + + charger = OpenEVSE(SERVER_URL) + + with pytest.raises(ParseJSONError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_process_request_401_error(mock_aioclient): + """Test process_request handles 401 authentication error.""" + + mock_aioclient.get( + TEST_URL_STATUS, + status=401, + body='{"error": "Unauthorized"}', + ) + + charger = OpenEVSE(SERVER_URL) + + with pytest.raises(AuthenticationError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_process_request_404_error(mock_aioclient): + """Test process_request handles 404 error.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=404, + body='{"error": "Not found"}', + ) + + charger = OpenEVSE(SERVER_URL) + # Should not raise, just log warning + result = await charger.process_request(TEST_URL_STATUS, method="get") + assert result == {"error": "Not found"} + + +async def test_process_request_405_error(mock_aioclient): + """Test process_request handles 405 error.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=405, + body='{"error": "Method not allowed"}', + ) + + charger = OpenEVSE(SERVER_URL) + # Should not raise, just log warning + result = await charger.process_request(TEST_URL_STATUS, method="get") + assert result == {"error": "Method not allowed"} + + +async def test_process_request_500_error(mock_aioclient): + """Test process_request handles 500 error.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=500, + body='{"error": "Internal server error"}', + ) + + charger = OpenEVSE(SERVER_URL) + # Should not raise, just log warning + result = await charger.process_request(TEST_URL_STATUS, method="get") + assert result == {"error": "Internal server error"} + + +async def test_process_request_timeout_error(): + """Test process_request handles TimeoutError.""" + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.side_effect = TimeoutError("Connection timeout") + + charger = OpenEVSE(SERVER_URL) + + with pytest.raises(TimeoutError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_process_request_server_timeout_error(): + """Test process_request handles ServerTimeoutError.""" + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.side_effect = ServerTimeoutError("Server timeout") + + charger = OpenEVSE(SERVER_URL) + + with pytest.raises(ServerTimeoutError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_process_request_content_type_error(): + """Test process_request handles ContentTypeError.""" + with patch("aiohttp.ClientSession.get") as mock_get: + error = ContentTypeError( + request_info=MagicMock(), + history=(), + message="Invalid content type", + ) + mock_get.side_effect = error + + charger = OpenEVSE(SERVER_URL) + + with pytest.raises(ContentTypeError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_process_request_post_with_config_version(mock_aioclient): + """Test process_request calls update when posting config_version.""" + mock_aioclient.post( + TEST_URL_CONFIG, + status=200, + body='{"config_version": "1.0"}', + ) + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body=load_fixture("v4_json/config.json"), + ) + + charger = OpenEVSE(SERVER_URL) + + # This should trigger update() because of config_version in response + result = await charger.process_request(TEST_URL_CONFIG, method="post", data={}) + + assert "config_version" in result + # Verify update was called by checking if status was set + assert charger._status is not None + + +async def test_send_command_no_ret_with_msg(mock_aioclient): + """Test send_command when response has msg but no ret.""" + mock_aioclient.post( + "http://openevse.test.tld/r", + status=200, + body='{"msg": "Command failed"}', + ) + + charger = OpenEVSE(SERVER_URL) + result = await charger.send_command("test_command") + + assert result == (False, "Command failed") + + +async def test_send_command_no_ret_no_msg(mock_aioclient): + """Test send_command when response has neither ret nor msg.""" + mock_aioclient.post( + "http://openevse.test.tld/r", + status=200, + body='{"error": "Unknown"}', + ) + + charger = OpenEVSE(SERVER_URL) + result = await charger.send_command("test_command") + + assert result == (False, "") + + +async def test_firmware_check_no_config(): + """Test firmware_check when config is not loaded.""" + charger = OpenEVSE(SERVER_URL) + + result = await charger.firmware_check() + + assert result is None + + +async def test_firmware_check_no_firmware_version(mock_aioclient): + """Test firmware_check when firmware_version is missing.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body="{}", + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body='{"hostname": "openevse"}', + ) + + charger = OpenEVSE(SERVER_URL) + await charger.update() + + result = await charger.firmware_check() + + assert result is None + + +async def test_firmware_check_github_api_error(mock_aioclient): + """Test firmware_check when GitHub API fails.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body=load_fixture("v4_json/config.json"), + ) + mock_aioclient.get( + "https://api.github.com/repos/OpenEVSE/ESP32_WiFi_V4.x/releases/latest", + status=404, + body='{"error": "Not found"}', + ) + + charger = OpenEVSE(SERVER_URL) + await charger.update() + + result = await charger.firmware_check() + + # Should return None when GitHub API fails + assert result is None + + +async def test_property_getters_with_missing_data(mock_aioclient): + """Test property getters when data is missing.""" + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body="{}", # Empty status + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body="{}", # Empty config + ) + + charger = OpenEVSE(SERVER_URL) + await charger.update() + + # Test various properties that should handle missing data + # String/numeric properties return None + assert charger.hostname is None + assert charger.ammeter_offset is None + assert charger.ammeter_scale_factor is None + assert charger.service_level is None + + # Boolean properties return False when data is missing + assert charger.temp_check_enabled is False + assert charger.diode_check_enabled is False + assert charger.vent_required_enabled is False + assert charger.ground_check_enabled is False + assert charger.stuck_relay_check_enabled is False + + +async def test_external_session_with_error_handling(mock_aioclient): + """Test external session handles errors properly.""" + + mock_aioclient.get( + TEST_URL_STATUS, + status=401, + body='{"error": "Unauthorized"}', + ) + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + + with pytest.raises(AuthenticationError): + await charger.process_request(TEST_URL_STATUS, method="get") + + # Session should still be open + assert not session.closed + + +async def test_external_session_unicode_decode_error(): + """Test external session handles UnicodeDecodeError.""" + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.text = AsyncMock( + side_effect=UnicodeDecodeError("utf-8", b"", 0, 1, "") + ) + mock_response.read = AsyncMock(return_value=b'{"status": "ok"}') + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + result = await charger.process_request(TEST_URL_STATUS, method="get") + + assert result == {"status": "ok"} + + +async def test_external_session_non_json_response(): + """Test external session handles non-JSON response.""" + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.text = AsyncMock(return_value="Not JSON") + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + result = await charger.process_request(TEST_URL_STATUS, method="get") + + assert result == "Not JSON" + + +async def test_external_session_400_error_with_msg(): + """Test external session handles 400 error with msg field.""" + mock_response = AsyncMock() + mock_response.status = 400 + mock_response.text = AsyncMock(return_value='{"msg": "Bad request"}') + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + + with pytest.raises(ParseJSONError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_external_session_400_error_with_error_field(): + """Test external session handles 400 error with error field.""" + mock_response = AsyncMock() + mock_response.status = 400 + mock_response.text = AsyncMock(return_value='{"error": "Invalid input"}') + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + + with pytest.raises(ParseJSONError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_external_session_401_error(): + """Test external session handles 401 authentication error.""" + mock_response = AsyncMock() + mock_response.status = 401 + mock_response.text = AsyncMock(return_value='{"error": "Unauthorized"}') + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + + with pytest.raises(AuthenticationError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_external_session_404_error(): + """Test external session handles 404 error.""" + mock_response = AsyncMock() + mock_response.status = 404 + mock_response.text = AsyncMock(return_value='{"error": "Not found"}') + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + result = await charger.process_request(TEST_URL_STATUS, method="get") + + assert result == {"error": "Not found"} + + +async def test_external_session_405_error(): + """Test external session handles 405 error.""" + mock_response = AsyncMock() + mock_response.status = 405 + mock_response.text = AsyncMock(return_value='{"error": "Method not allowed"}') + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + result = await charger.process_request(TEST_URL_STATUS, method="get") + + assert result == {"error": "Method not allowed"} + + +async def test_external_session_500_error(): + """Test external session handles 500 error.""" + mock_response = AsyncMock() + mock_response.status = 500 + mock_response.text = AsyncMock(return_value='{"error": "Internal server error"}') + + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.return_value.__aenter__.return_value = mock_response + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + result = await charger.process_request(TEST_URL_STATUS, method="get") + + assert result == {"error": "Internal server error"} + + +async def test_external_session_post_with_config_version(mock_aioclient): + """Test external session with POST that triggers update.""" + mock_aioclient.post( + TEST_URL_CONFIG, + status=200, + body='{"config_version": "1.0"}', + ) + mock_aioclient.get( + TEST_URL_STATUS, + status=200, + body=load_fixture("v4_json/status.json"), + ) + mock_aioclient.get( + TEST_URL_CONFIG, + status=200, + body=load_fixture("v4_json/config.json"), + ) + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + result = await charger.process_request(TEST_URL_CONFIG, method="post", data={}) + + assert "config_version" in result + assert charger._status is not None + + +async def test_external_session_timeout_error(): + """Test external session handles TimeoutError.""" + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.side_effect = TimeoutError("Connection timeout") + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + + with pytest.raises(TimeoutError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_external_session_server_timeout_error(): + """Test external session handles ServerTimeoutError.""" + with patch("aiohttp.ClientSession.get") as mock_get: + mock_get.side_effect = ServerTimeoutError("Server timeout") + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + + with pytest.raises(ServerTimeoutError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_external_session_content_type_error(): + """Test external session handles ContentTypeError.""" + with patch("aiohttp.ClientSession.get") as mock_get: + error = ContentTypeError( + request_info=MagicMock(), + history=(), + message="Invalid content type", + ) + mock_get.side_effect = error + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + + with pytest.raises(ContentTypeError): + await charger.process_request(TEST_URL_STATUS, method="get") + + +async def test_identify_with_buildenv(mock_aioclient): + """Test test_and_get method (identify) with buildenv in response.""" + mock_aioclient.get( + "http://openevse.test.tld/config", + status=200, + body='{"wifi_serial": "123", "buildenv": "esp32"}', + ) + charger = OpenEVSE(SERVER_URL) + data = await charger.test_and_get() + assert data["model"] == "esp32" + + +async def test_ws_start_already_listening(): + """Test ws_start raises AlreadyListening if already listening.""" + charger = OpenEVSE(SERVER_URL) + charger.websocket = MagicMock() + charger.websocket.state = "connected" + charger._ws_listening = True + + with pytest.raises(AlreadyListening): + charger.ws_start() + + +async def test_ws_start_reset_listening(): + """Test ws_start resets _ws_listening if websocket is not connected.""" + charger = OpenEVSE(SERVER_URL) + charger.websocket = MagicMock() + charger.websocket.state = "disconnected" + charger._ws_listening = True + + with patch.object(charger, "_start_listening"): + charger.ws_start() + assert charger._ws_listening is False + + +async def test_start_listening_no_loop(): + """Test _start_listening when no running loop is found.""" + charger = OpenEVSE(SERVER_URL) + charger.websocket = MagicMock() + + with patch("asyncio.get_running_loop", side_effect=RuntimeError): + with patch("asyncio.get_event_loop") as mock_get_loop: + mock_loop = MagicMock() + mock_get_loop.return_value = mock_loop + charger._start_listening() + assert charger._loop == mock_loop + + +async def test_update_status_states(): + """Test _update_status with different websocket states.""" + charger = OpenEVSE(SERVER_URL) + charger.websocket = MagicMock() + charger.websocket.uri = "ws://test" + + # Test connected + await charger._update_status(SIGNAL_CONNECTION_STATE, STATE_CONNECTED, None) + assert charger._ws_listening is True + + # Test disconnected + await charger._update_status( + SIGNAL_CONNECTION_STATE, STATE_DISCONNECTED, "test error" + ) + assert charger._ws_listening is False + + # Test stopped with error + await charger._update_status(SIGNAL_CONNECTION_STATE, STATE_STOPPED, "fatal error") + assert charger._ws_listening is False + + +async def test_update_status_data_triggers(mock_aioclient): + """Test _update_status with data that triggers update and callback.""" + mock_aioclient.get( + "http://openevse.test.tld/status", + status=200, + body='{"version": "4.0.1"}', + ) + mock_aioclient.get( + "http://openevse.test.tld/config", + status=200, + body='{"hostname": "test"}', + ) + + charger = OpenEVSE(SERVER_URL) + + # Set a coroutine callback + mock_callback = AsyncMock() + charger.callback = mock_callback + + # "wh" should be popped to "watthour" + # "config_version" is in UPDATE_TRIGGERS + data = {"wh": 100, "config_version": 2} + await charger._update_status("data", data, None) + + assert data["watthour"] == 100 + assert "wh" not in data + assert charger._status["watthour"] == 100 + mock_callback.assert_called_once() + + # Test non-coroutine callback + charger.callback = MagicMock() + await charger._update_status("data", {"test": 1}, None) + charger.callback.assert_called_once() + + +async def test_version_check_exceptions(): + """Test _version_check exception paths.""" + charger = OpenEVSE(SERVER_URL) + + # Trigger re.search Exception + charger._config = {"version": "invalid"} + with patch("re.search", side_effect=Exception): + assert charger._version_check("2.0.0") is False + + # Trigger AwesomeVersionCompareException in limit comparison + + with patch( + "awesomeversion.AwesomeVersion.__le__", + side_effect=AwesomeVersionCompareException, + ): + charger._config = {"version": "2.9.1"} + assert charger._version_check("2.0.0", "3.0.0") is False + + +async def test_get_schedule(mock_aioclient): + """Test get_schedule method.""" + mock_aioclient.post( + "http://openevse.test.tld/schedule", + status=200, + body='{"sc": 1}', + ) + charger = OpenEVSE(SERVER_URL) + result = await charger.get_schedule() + assert result == {"sc": 1} + + +async def test_repeat(): + """Test repeat helper.""" + charger = OpenEVSE(SERVER_URL) + charger.websocket = MagicMock() + # Mock ws_state to stop after one iteration + with patch( + "openevsehttp.__main__.OpenEVSE.ws_state", new_callable=PropertyMock + ) as mock_state: + mock_state.side_effect = ["connected", "stopped"] + + mock_func = AsyncMock() + with patch("asyncio.sleep", AsyncMock()): + await charger.repeat(1, mock_func, "test") + mock_func.assert_called_once_with("test") + + +async def test_ir_temperature(): + """Test ir_temperature property.""" + charger = OpenEVSE(SERVER_URL) + charger._status = {"temp3": 250} + assert charger.ir_temperature == 25.0 + + +async def test_usage_session_none(): + """Test usage_session returns None when no data is present.""" + charger = OpenEVSE(SERVER_URL) + charger._status = {} + assert charger.usage_session is None + + +async def test_version_check_master(): + """Test _version_check with 'master' in version.""" + charger = OpenEVSE(SERVER_URL) + charger._config = {"version": "v4.0.1.master"} + # This should set value to "dev" + assert charger._version_check("2.0.0") is True + + +async def test_version_check_limit(): + """Test _version_check with max_version.""" + charger = OpenEVSE(SERVER_URL) + charger._config = {"version": "2.9.1"} + assert charger._version_check("2.0.0", "3.0.0") is True + assert charger._version_check("3.0.0", "4.0.0") is False + + # Test the wrapper + assert charger.version_check("2.0.0") is True + + +async def test_firmware_check_external_session(mock_aioclient): + """Test firmware_check with an external session.""" + mock_aioclient.get( + "http://openevse.test.tld/status", + status=200, + body='{"version": "4.0.1", "wifi_serial": "123"}', + ) + mock_aioclient.get( + "http://openevse.test.tld/config", + status=200, + body='{"hostname": "test", "version": "4.0.1"}', + ) + mock_aioclient.get( + "https://api.github.com/repos/OpenEVSE/ESP32_WiFi_V4.x/releases/latest", + status=200, + body='{"tag_name": "v4.1.0", "body": "notes", "html_url": "http://github"}', + ) + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + await charger.update() + # Ensure version is set in config + charger._config["version"] = "4.0.1" + result = await charger.firmware_check() + assert result["latest_version"] == "v4.1.0" + + +async def test_firmware_check_errors(mock_aioclient): + """Test firmware_check error paths.""" + mock_aioclient.get( + "http://openevse.test.tld/status", + status=200, + body='{"version": "4.0.1", "wifi_serial": "123"}', + ) + mock_aioclient.get( + "http://openevse.test.tld/config", + status=200, + body='{"hostname": "test", "version": "4.0.1"}', + ) + + url = "https://api.github.com/repos/OpenEVSE/ESP32_WiFi_V4.x/releases/latest" + + # Status 404 from github + mock_aioclient.get(url, status=404) + + async with aiohttp.ClientSession() as session: + charger = OpenEVSE(SERVER_URL, session=session) + await charger.update() + charger._config["version"] = "4.0.1" + assert await charger.firmware_check() is None + + # Timeout from github + mock_aioclient.get(url, exception=asyncio.TimeoutError()) + charger = OpenEVSE(SERVER_URL) + charger._config["version"] = "4.0.1" + assert await charger.firmware_check() is None + + # ContentTypeError from github + + mock_aioclient.get( + url, exception=ContentTypeError(MagicMock(), MagicMock(), message="test") + ) + assert await charger.firmware_check() is None + + +async def test_websocket_pong(): + """Test websocket handles pong message.""" + + callback = AsyncMock() + async with aiohttp.ClientSession() as session: + ws = OpenEVSEWebsocket(f"http://{SERVER_URL}", callback, session=session) + + mock_ws = AsyncMock() + # Mock the async iterator of ws_client + msg = MagicMock() + msg.type = aiohttp.WSMsgType.TEXT + msg.json.return_value = {"pong": 1} + mock_ws.__aiter__.return_value = [msg] + + with patch.object(session, "ws_connect") as mock_ws_connect: + mock_context = AsyncMock() + mock_context.__aenter__.return_value = mock_ws + mock_ws_connect.return_value = mock_context + + # Set state to stopped after one iteration to break the loop + ws.state = "connected" + + async def side_effect(msgtype, data, error): + if msgtype == SIGNAL_CONNECTION_STATE and data == STATE_STOPPED: + pass + elif msgtype == "data" and "pong" in data: + ws.state = "stopped" + + callback.side_effect = side_effect + + await ws.running() + assert ws._pong is not None + + +async def test_websocket_listen(): + """Test websocket listen calls running.""" + + callback = AsyncMock() + ws = OpenEVSEWebsocket(f"http://{SERVER_URL}", callback) + + with patch.object(ws, "running", AsyncMock()) as mock_running: + # Break loop after first call + ws.state = "starting" + + async def side_effect(): + ws.state = "stopped" + + mock_running.side_effect = side_effect + + await ws.listen() + mock_running.assert_called_once() + + +async def test_websocket_stop_break(): + """Test websocket stops loop when state is stopped.""" + + callback = AsyncMock() + async with aiohttp.ClientSession() as session: + ws = OpenEVSEWebsocket(f"http://{SERVER_URL}", callback, session=session) + + mock_ws = AsyncMock() + msg = MagicMock() + msg.type = aiohttp.WSMsgType.TEXT + msg.json.return_value = {"test": 1} + mock_ws.__aiter__.return_value = [msg, msg] + + with patch.object(session, "ws_connect") as mock_ws_connect: + mock_context = AsyncMock() + mock_context.__aenter__.return_value = mock_ws + mock_ws_connect.return_value = mock_context + + ws.state = "connected" + + async def side_effect(msgtype, _data, _error): + if msgtype == "data": + ws._state = "stopped" # Direct set to avoid callback loop + + callback.side_effect = side_effect + + await ws.running() + # Check that we received "data" once + calls = [call for call in callback.call_args_list if call[0][0] == "data"] + assert len(calls) == 1 diff --git a/tests/test_websocket.py b/tests/test_websocket.py index d97d7aa..0a8586e 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -170,8 +170,8 @@ async def test_websocket_auth(ws_client_auth): # Use an async generator function for clean async iteration async def empty_iter(): - if False: - yield + return + yield mock_ws.__aiter__.side_effect = empty_iter