From 0179fad95c61628670c1614cf6b5c23cc3ba8236 Mon Sep 17 00:00:00 2001 From: ArnoutD Date: Tue, 14 Jan 2025 22:37:37 +0100 Subject: [PATCH 1/4] Correct subscription to feature updates for only intended node --- plugwise_usb/nodes/helpers/subscription.py | 10 ++++++---- plugwise_usb/nodes/node.py | 4 +++- tests/test_usb.py | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/plugwise_usb/nodes/helpers/subscription.py b/plugwise_usb/nodes/helpers/subscription.py index da91c656c..a3b2c0554 100644 --- a/plugwise_usb/nodes/helpers/subscription.py +++ b/plugwise_usb/nodes/helpers/subscription.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from typing import Any + from ...api import NodeFeature @@ -20,11 +21,12 @@ class NodeFeatureSubscription: class FeaturePublisher: """Base Class to call awaitable of subscription when event happens.""" + def __init__(self) -> None: + self._feature_update_subscribers: dict[ + Callable[[], None], + NodeFeatureSubscription, + ] = {} - _feature_update_subscribers: dict[ - Callable[[], None], - NodeFeatureSubscription, - ] = {} def subscribe_to_feature_update( self, diff --git a/plugwise_usb/nodes/node.py b/plugwise_usb/nodes/node.py index fd586406d..75ec5dd64 100644 --- a/plugwise_usb/nodes/node.py +++ b/plugwise_usb/nodes/node.py @@ -63,6 +63,7 @@ def __init__( loaded_callback: Callable[[NodeEvent, str], Awaitable[None]], ): """Initialize Plugwise base node class.""" + super().__init__() self._loaded_callback = loaded_callback self._message_subscribe = controller.subscribe_to_messages self._features: tuple[NodeFeature, ...] = NODE_FEATURES @@ -415,7 +416,8 @@ async def _available_update_state( if ( self._last_seen is not None and timestamp is not None - and self._last_seen < timestamp + and (timestamp - self._last_seen).seconds > 5 + ): self._last_seen = timestamp await self.publish_feature_update_to_subscribers( diff --git a/tests/test_usb.py b/tests/test_usb.py index 3bd1ef5c1..97731aec6 100644 --- a/tests/test_usb.py +++ b/tests/test_usb.py @@ -838,7 +838,7 @@ async def test_node_relay_and_power(self, monkeypatch: pytest.MonkeyPatch) -> No assert await stick.nodes["2222222222222222"].load() self.test_init_relay_state_on = asyncio.Future() self.test_init_relay_state_off = asyncio.Future() - unsub_inti_relay = stick.nodes["0098765432101234"].subscribe_to_feature_update( + unsub_inti_relay = stick.nodes["2222222222222222"].subscribe_to_feature_update( node_feature_callback=self.node_init_relay_state, features=(pw_api.NodeFeature.RELAY_INIT,), ) From d450f03390766eab774e054b1eedcb6872a31282 Mon Sep 17 00:00:00 2001 From: ArnoutD Date: Tue, 14 Jan 2025 22:43:23 +0100 Subject: [PATCH 2/4] Correct message overload on intitial circle log fetch. Limit outstanding messages to 4 --- plugwise_usb/connection/manager.py | 7 +++++++ plugwise_usb/connection/queue.py | 14 +++++++++++--- plugwise_usb/connection/receiver.py | 13 ++++++++++++- plugwise_usb/connection/sender.py | 7 +++++++ plugwise_usb/nodes/circle.py | 18 +++++++----------- 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/plugwise_usb/connection/manager.py b/plugwise_usb/connection/manager.py index 7dea480be..74f1203e3 100644 --- a/plugwise_usb/connection/manager.py +++ b/plugwise_usb/connection/manager.py @@ -36,6 +36,13 @@ def __init__(self) -> None: ] = {} self._unsubscribe_stick_events: Callable[[], None] | None = None + @property + def queue_depth(self) -> int: + return self._sender.processed_messages - self._receiver.processed_messages + + def correct_received_messages(self, correction: int) -> None: + self._receiver.correct_processed_messages(correction) + @property def serial_path(self) -> str: """Return current port.""" diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 12362a22d..f84754868 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -82,7 +82,7 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse: ) while request.resend and not request.waiting_for_response: - _LOGGER.warning("submit | start (%s) %s", request.retries_left, request) + _LOGGER.debug("submit | start (%s) %s", request.retries_left, request) if not self._running or self._stick is None: raise StickError( f"Cannot send message {request.__class__.__name__} for" @@ -91,6 +91,7 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse: await self._add_request_to_queue(request) try: response: PlugwiseResponse = await request.response_future() + return response except (NodeTimeout, StickTimeout) as e: if isinstance(request, NodePingRequest): # For ping requests it is expected to receive timeouts, so lower log level @@ -103,17 +104,19 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse: _LOGGER.warning("%s, cancel request", e) # type: ignore[unreachable] except StickError as exception: _LOGGER.error(exception) + self._stick.correct_received_messages(1) raise StickError( f"No response received for {request.__class__.__name__} " + f"to {request.mac_decoded}" ) from exception except BaseException as exception: + self._stick.correct_received_messages(1) raise StickError( f"No response received for {request.__class__.__name__} " + f"to {request.mac_decoded}" ) from exception - return response + return None async def _add_request_to_queue(self, request: PlugwiseRequest) -> None: """Add request to send queue.""" @@ -133,8 +136,13 @@ async def _send_queue_worker(self) -> None: if request.priority == Priority.CANCEL: self._submit_queue.task_done() return + + while self._stick.queue_depth > 3: + _LOGGER.info("Awaiting plugwise responses %d", self._stick.queue_depth) + await sleep(0.125) + await self._stick.write_to_stick(request) self._submit_queue.task_done() - await sleep(0.001) + _LOGGER.debug("Sent from queue %s", request) _LOGGER.debug("Send_queue_worker stopped") diff --git a/plugwise_usb/connection/receiver.py b/plugwise_usb/connection/receiver.py index 6b36be0d3..917537afb 100644 --- a/plugwise_usb/connection/receiver.py +++ b/plugwise_usb/connection/receiver.py @@ -99,6 +99,7 @@ def __init__( self._data_worker_task: Task[None] | None = None # Message processing + self._processed_msgs = 0 self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue() self._last_processed_messages: list[bytes] = [] self._current_seq_id: bytes | None = None @@ -137,11 +138,20 @@ def connection_lost(self, exc: Exception | None = None) -> None: self._transport = None self._connection_state = False + @property + def processed_messages(self) -> int: + """Return the number of processed messages.""" + return self._processed_msgs + @property def is_connected(self) -> bool: """Return current connection state of the USB-Stick.""" return self._connection_state + def correct_processed_messages(self, correction: int) -> None: + """Return the number of processed messages.""" + self._processed_msgs += correction + def connection_made(self, transport: SerialTransport) -> None: """Call when the serial connection to USB-Stick is established.""" _LOGGER.info("Connection made") @@ -278,6 +288,7 @@ async def _message_queue_worker(self) -> None: await self._notify_stick_subscribers(response) else: await self._notify_node_response_subscribers(response) + self._processed_msgs += 1 self._message_queue.task_done() await sleep(0) _LOGGER.debug("Message queue worker stopped") @@ -457,7 +468,7 @@ async def _notify_node_response_subscribers( self._node_subscription_lock.release() if len(notify_tasks) > 0: - _LOGGER.debug("Received %s", node_response) + _LOGGER.debug("Received %s %s", node_response, node_response.seq_id) if node_response.seq_id not in BROADCAST_IDS: self._last_processed_messages.append(node_response.seq_id) # Limit tracking to only the last appended request (FIFO) diff --git a/plugwise_usb/connection/sender.py b/plugwise_usb/connection/sender.py index a87ac33df..007103a25 100644 --- a/plugwise_usb/connection/sender.py +++ b/plugwise_usb/connection/sender.py @@ -38,11 +38,17 @@ def __init__(self, stick_receiver: StickReceiver, transport: Transport) -> None: self._loop = get_running_loop() self._receiver = stick_receiver self._transport = transport + self._processed_msgs = 0 self._stick_response: Future[StickResponse] | None = None self._stick_lock = Lock() self._current_request: None | PlugwiseRequest = None self._unsubscribe_stick_response: Callable[[], None] | None = None + @property + def processed_messages(self) -> int: + """Return the number of processed messages.""" + return self._processed_msgs + async def start(self) -> None: """Start the sender.""" # Subscribe to ACCEPT stick responses, which contain the seq_id we need. @@ -133,6 +139,7 @@ async def write_request_to_port(self, request: PlugwiseRequest) -> None: finally: self._stick_response.cancel() self._stick_lock.release() + self._processed_msgs += 1 async def _process_stick_response(self, response: StickResponse) -> None: """Process stick response.""" diff --git a/plugwise_usb/nodes/circle.py b/plugwise_usb/nodes/circle.py index 4a9854dcd..5886f580b 100644 --- a/plugwise_usb/nodes/circle.py +++ b/plugwise_usb/nodes/circle.py @@ -2,7 +2,7 @@ from __future__ import annotations -from asyncio import Task, create_task, gather +from asyncio import Task, create_task from collections.abc import Awaitable, Callable from dataclasses import replace from datetime import UTC, datetime @@ -453,11 +453,8 @@ async def get_missing_energy_logs(self) -> None: log_address, _ = calc_log_address(log_address, 1, -4) total_addresses -= 1 - if not all(await gather(*log_update_tasks)): - _LOGGER.info( - "Failed to request one or more update energy log for %s", - self._mac_in_str, - ) + for task in log_update_tasks: + await task if self._cache_enabled: await self._energy_log_records_save_to_cache() @@ -475,9 +472,8 @@ async def get_missing_energy_logs(self) -> None: ) missing_addresses = sorted(missing_addresses, reverse=True) - await gather( - *[self.energy_log_update(address) for address in missing_addresses] - ) + for address in missing_addresses: + await self.energy_log_update(address) if self._cache_enabled: await self._energy_log_records_save_to_cache() @@ -528,7 +524,7 @@ async def _energy_log_records_load_from_cache(self) -> bool: """Load energy_log_record from cache.""" cache_data = self._get_cache(CACHE_ENERGY_COLLECTION) if (cache_data := self._get_cache(CACHE_ENERGY_COLLECTION)) is None: - _LOGGER.debug( + _LOGGER.warning( "Failed to restore energy log records from cache for node %s", self.name ) return False @@ -811,7 +807,7 @@ async def _load_from_cache(self) -> bool: return False # Energy collection if await self._energy_log_records_load_from_cache(): - _LOGGER.debug( + _LOGGER.warning( "Node %s failed to load energy_log_records from cache", self._mac_in_str, ) From 2afa1ce7fe642d821f84af72a1a262d53c3d9cd4 Mon Sep 17 00:00:00 2001 From: ArnoutD Date: Tue, 14 Jan 2025 22:44:13 +0100 Subject: [PATCH 3/4] use os.path.join --- plugwise_usb/helpers/cache.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/plugwise_usb/helpers/cache.py b/plugwise_usb/helpers/cache.py index ad00184cb..256a59094 100644 --- a/plugwise_usb/helpers/cache.py +++ b/plugwise_usb/helpers/cache.py @@ -59,10 +59,8 @@ async def initialize_cache(self, create_root_folder: bool = False) -> None: cache_dir = self._get_writable_os_dir() await makedirs(cache_dir, exist_ok=True) self._cache_path = cache_dir - if os_name == "nt": - self._cache_file = f"{cache_dir}\\{self._file_name}" - else: - self._cache_file = f"{cache_dir}/{self._file_name}" + + self._cache_file = os_path_join(self._cache_path, self._file_name) self._cache_file_exists = await ospath.exists(self._cache_file) self._initialized = True _LOGGER.debug("Start using network cache file: %s", self._cache_file) From 7f2c05eabc58c95918f401877380439dd59d6bc2 Mon Sep 17 00:00:00 2001 From: ArnoutD Date: Tue, 14 Jan 2025 22:50:11 +0100 Subject: [PATCH 4/4] Negate SQ issue --- plugwise_usb/connection/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index f84754868..29d1f74d1 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -74,7 +74,7 @@ async def stop(self) -> None: self._stick = None _LOGGER.debug("queue stopped") - async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse: + async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse | None: """Add request to queue and return the response of node. Raises an error when something fails.""" if request.waiting_for_response: raise MessageError(