From 01ce787c51402d8c8b501b7608b571df54537e8c Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Sun, 1 Jun 2025 09:29:51 +0200 Subject: [PATCH 01/34] Implement DroppingPriorityQueue class --- plugwise_usb/connection/queue.py | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 614277447..1eecee03d 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -2,9 +2,10 @@ from __future__ import annotations -from asyncio import PriorityQueue, Task, get_running_loop, sleep +from asyncio import Queue, Task, get_running_loop, sleep from collections.abc import Callable from dataclasses import dataclass +from sortedcontainers import SortedList import logging from ..api import StickEvent @@ -25,6 +26,39 @@ class RequestState: zigbee_address: int +class DroppingPriorityQueue(Queue): + def _init(self, maxsize): + # called by asyncio.Queue.__init__ + self._queue = SortedList() + + def _put(self, item): + # called by asyncio.Queue.put_nowait + self._queue.add(item) + + def _get(self): + # called by asyncio.Queue.get_nowait + # pop the first (most important) item off the queue + return self._queue.pop(0) + + def __drop(self): + # drop the last (least important) item from the queue + self._queue.pop() + # no consumer will get a chance to process this item, so + # we must decrement the unfinished count ourselves + self.task_done() + + def put_nowait(self, item): + if self.full(): + self.__drop() + super().put_nowait(item) + + async def put(self, item): + # Queue.put blocks when full, so we must override it. + # Since our put_nowait never raises QueueFull, we can just + # call it directly + self.put_nowait(item) + + class StickQueue: """Manage queue of all request sessions.""" @@ -32,7 +66,7 @@ def __init__(self) -> None: """Initialize the message session controller.""" self._stick: StickConnectionManager | None = None self._loop = get_running_loop() - self._submit_queue: PriorityQueue[PlugwiseRequest] = PriorityQueue() + self._submit_queue: DroppingPriorityQueue[PlugwiseRequest] = DroppingPriorityQueue(maxsize=85) self._submit_worker_task: Task[None] | None = None self._unsubscribe_connection_events: Callable[[], None] | None = None self._running = False From 7effe813393b7c5d0d40f7ccf31135e9986c1649 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Sun, 1 Jun 2025 09:32:53 +0200 Subject: [PATCH 02/34] Add sortedcontainers to dependencies --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index ff8bc48cc..5804e217e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "aiofiles", "crcmod", "semver", + "sortedcontainers", ] [project.urls] From 16c0ced2e4d165b079cffa7ad38064c744b5cad5 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Sun, 1 Jun 2025 14:05:18 +0200 Subject: [PATCH 03/34] Remove double line, debug --- plugwise_usb/connection/queue.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 1eecee03d..ea78b8922 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -175,16 +175,14 @@ async def _send_queue_worker(self) -> None: while self._running and self._stick is not None: request = await self._submit_queue.get() _LOGGER.debug("Sending from send queue %s", request) + _LOGGER.debug("HOI queue: %s", list(self._submit_queue._queue)) if request.priority == Priority.CANCEL: self._submit_queue.task_done() return if self._stick.queue_depth > 3: + _LOGGER.warning("Awaiting plugwise responses %d", self._stick.queue_depth) await sleep(0.125) - if self._stick.queue_depth > 3: - _LOGGER.warning( - "Awaiting plugwise responses %d", self._stick.queue_depth - ) await self._stick.write_to_stick(request) self._submit_queue.task_done() From 6fd4fc5b4b9dd65ba1d2082fca58675d45201fb5 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Sun, 1 Jun 2025 14:08:54 +0200 Subject: [PATCH 04/34] Move debug --- plugwise_usb/connection/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index ea78b8922..1c2d8ee19 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -164,6 +164,7 @@ async def _add_request_to_queue(self, request: PlugwiseRequest) -> None: """Add request to send queue.""" _LOGGER.debug("Add request to queue: %s", request) await self._submit_queue.put(request) + _LOGGER.debug("HOI queue: %s", list(self._submit_queue._queue)) if self._submit_worker_task is None or self._submit_worker_task.done(): self._submit_worker_task = self._loop.create_task( self._send_queue_worker(), name="Send queue worker" @@ -175,7 +176,6 @@ async def _send_queue_worker(self) -> None: while self._running and self._stick is not None: request = await self._submit_queue.get() _LOGGER.debug("Sending from send queue %s", request) - _LOGGER.debug("HOI queue: %s", list(self._submit_queue._queue)) if request.priority == Priority.CANCEL: self._submit_queue.task_done() return From c84bbf93b991c437ccdc098c740aad974648eac9 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Mon, 2 Jun 2025 16:06:09 +0200 Subject: [PATCH 05/34] Show queue maxsize --- plugwise_usb/connection/queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 1c2d8ee19..f575028d1 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -164,6 +164,7 @@ async def _add_request_to_queue(self, request: PlugwiseRequest) -> None: """Add request to send queue.""" _LOGGER.debug("Add request to queue: %s", request) await self._submit_queue.put(request) + _LOGGER.debug("HOI queue maxsize: %s", self._submit_queue.maxsize) _LOGGER.debug("HOI queue: %s", list(self._submit_queue._queue)) if self._submit_worker_task is None or self._submit_worker_task.done(): self._submit_worker_task = self._loop.create_task( From bc8572311b594ef7c8cf6e7c4faa6f999b31c630 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Mon, 2 Jun 2025 16:54:47 +0200 Subject: [PATCH 06/34] Remove failing debug-line --- plugwise_usb/connection/queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index f575028d1..e857c3d90 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -165,7 +165,6 @@ async def _add_request_to_queue(self, request: PlugwiseRequest) -> None: _LOGGER.debug("Add request to queue: %s", request) await self._submit_queue.put(request) _LOGGER.debug("HOI queue maxsize: %s", self._submit_queue.maxsize) - _LOGGER.debug("HOI queue: %s", list(self._submit_queue._queue)) if self._submit_worker_task is None or self._submit_worker_task.done(): self._submit_worker_task = self._loop.create_task( self._send_queue_worker(), name="Send queue worker" From c3a25853067b3690d7eeb41522bf3d646281afbc Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 10:26:30 +0200 Subject: [PATCH 07/34] Set queue-length to 7 seconds max --- plugwise_usb/connection/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index e857c3d90..7c4134c01 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -66,7 +66,7 @@ def __init__(self) -> None: """Initialize the message session controller.""" self._stick: StickConnectionManager | None = None self._loop = get_running_loop() - self._submit_queue: DroppingPriorityQueue[PlugwiseRequest] = DroppingPriorityQueue(maxsize=85) + self._submit_queue: DroppingPriorityQueue[PlugwiseRequest] = DroppingPriorityQueue(maxsize=56) self._submit_worker_task: Task[None] | None = None self._unsubscribe_connection_events: Callable[[], None] | None = None self._running = False From 473e3887a5faf17998c4891d640a765219ebdad4 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 10:38:57 +0200 Subject: [PATCH 08/34] Add missing docstrings --- plugwise_usb/connection/queue.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 7c4134c01..9fc684469 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -27,6 +27,10 @@ class RequestState: class DroppingPriorityQueue(Queue): + """Define a queue that has a maximum size. + + Older entries are dropped when the queue reaches its maximum size. + """ def _init(self, maxsize): # called by asyncio.Queue.__init__ self._queue = SortedList() @@ -48,14 +52,16 @@ def __drop(self): self.task_done() def put_nowait(self, item): + """ Override method for queue.put.""" if self.full(): self.__drop() super().put_nowait(item) async def put(self, item): - # Queue.put blocks when full, so we must override it. - # Since our put_nowait never raises QueueFull, we can just - # call it directly + """Queue.put blocks when full, so we must override it. + + Since our put_nowait never raises QueueFull, we can just call it directly. + """ self.put_nowait(item) From 9c299787d10e6fefef23bc15cb43bba98cd1ce58 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 10:43:47 +0200 Subject: [PATCH 09/34] Ruff fixes --- plugwise_usb/connection/queue.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 9fc684469..b96ecfc68 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -5,9 +5,10 @@ from asyncio import Queue, Task, get_running_loop, sleep from collections.abc import Callable from dataclasses import dataclass -from sortedcontainers import SortedList import logging +from sortedcontainers import SortedList + from ..api import StickEvent from ..exceptions import MessageError, NodeTimeout, StickError, StickTimeout from ..messages import Priority @@ -31,6 +32,7 @@ class DroppingPriorityQueue(Queue): Older entries are dropped when the queue reaches its maximum size. """ + def _init(self, maxsize): # called by asyncio.Queue.__init__ self._queue = SortedList() @@ -52,7 +54,7 @@ def __drop(self): self.task_done() def put_nowait(self, item): - """ Override method for queue.put.""" + """Override method for queue.put.""" if self.full(): self.__drop() super().put_nowait(item) @@ -72,7 +74,9 @@ def __init__(self) -> None: """Initialize the message session controller.""" self._stick: StickConnectionManager | None = None self._loop = get_running_loop() - self._submit_queue: DroppingPriorityQueue[PlugwiseRequest] = DroppingPriorityQueue(maxsize=56) + self._submit_queue: DroppingPriorityQueue[PlugwiseRequest] = ( + DroppingPriorityQueue(maxsize=56) + ) self._submit_worker_task: Task[None] | None = None self._unsubscribe_connection_events: Callable[[], None] | None = None self._running = False @@ -187,7 +191,9 @@ async def _send_queue_worker(self) -> None: return if self._stick.queue_depth > 3: - _LOGGER.warning("Awaiting plugwise responses %d", self._stick.queue_depth) + _LOGGER.warning( + "Awaiting plugwise responses %d", self._stick.queue_depth + ) await sleep(0.125) await self._stick.write_to_stick(request) From d57fb2dbe5835c78b926ce464616ec8dfb0c0473 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 10:48:57 +0200 Subject: [PATCH 10/34] Set to v0.44.8a0 test-version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5804e217e..8820995d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.7" +version = "0.44.8a0" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [ From 9385d1f216aa444086428154eb72e2c779ea5868 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 10:58:11 +0200 Subject: [PATCH 11/34] Improve docstring --- plugwise_usb/connection/queue.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index b96ecfc68..a46ea8178 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -28,9 +28,10 @@ class RequestState: class DroppingPriorityQueue(Queue): - """Define a queue that has a maximum size. + """Define a priority queue that has a maximum size. - Older entries are dropped when the queue reaches its maximum size. + First lowest-priority items and then oldest items are dropped + when the queue reaches its maximum size. """ def _init(self, maxsize): From a8f855a0e22def2640b3457ea41bc81c1bf202f6 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 19:59:13 +0200 Subject: [PATCH 12/34] Add dropped message counter and use it to recalculate queue depth --- plugwise_usb/connection/queue.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index a46ea8178..3c61e6b37 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -36,6 +36,7 @@ class DroppingPriorityQueue(Queue): def _init(self, maxsize): # called by asyncio.Queue.__init__ + self.dropped_msgs = 0 self._queue = SortedList() def _put(self, item): @@ -50,6 +51,7 @@ def _get(self): def __drop(self): # drop the last (least important) item from the queue self._queue.pop() + self.dropped_msgs += 1 # no consumer will get a chance to process this item, so # we must decrement the unfinished count ourselves self.task_done() @@ -191,9 +193,10 @@ async def _send_queue_worker(self) -> None: self._submit_queue.task_done() return - if self._stick.queue_depth > 3: + queue_depth = self._stick.queue_depth - self._submit_queue.dropped_msgs + if queue_depth > 3: _LOGGER.warning( - "Awaiting plugwise responses %d", self._stick.queue_depth + "Awaiting plugwise responses %d", queue_depth ) await sleep(0.125) From 1aa3a6d20a13215981dc0671eeb50aa632271b72 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 19:59:55 +0200 Subject: [PATCH 13/34] Remove HOI-log-message --- plugwise_usb/connection/queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 3c61e6b37..4452397f7 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -177,7 +177,6 @@ async def _add_request_to_queue(self, request: PlugwiseRequest) -> None: """Add request to send queue.""" _LOGGER.debug("Add request to queue: %s", request) await self._submit_queue.put(request) - _LOGGER.debug("HOI queue maxsize: %s", self._submit_queue.maxsize) if self._submit_worker_task is None or self._submit_worker_task.done(): self._submit_worker_task = self._loop.create_task( self._send_queue_worker(), name="Send queue worker" From 70931fc00fe1b95aec1ddce76eb85d55996bc68a Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 20:01:13 +0200 Subject: [PATCH 14/34] Ruff fix --- plugwise_usb/connection/queue.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 4452397f7..41a24b7e9 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -194,9 +194,7 @@ async def _send_queue_worker(self) -> None: queue_depth = self._stick.queue_depth - self._submit_queue.dropped_msgs if queue_depth > 3: - _LOGGER.warning( - "Awaiting plugwise responses %d", queue_depth - ) + _LOGGER.warning("Awaiting plugwise responses %d", queue_depth) await sleep(0.125) await self._stick.write_to_stick(request) From 4278a150ac6e6e8c3ccda08fff54cb2f947dd657 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 20:01:37 +0200 Subject: [PATCH 15/34] Bump to a1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8820995d8..66200262b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.8a0" +version = "0.44.8a1" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [ From 0271c41e73220bfa39435d66eba2e95c20f9e5a8 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 20:06:51 +0200 Subject: [PATCH 16/34] Update logger-warnings --- plugwise_usb/connection/queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 41a24b7e9..bf7b47290 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -194,7 +194,8 @@ async def _send_queue_worker(self) -> None: queue_depth = self._stick.queue_depth - self._submit_queue.dropped_msgs if queue_depth > 3: - _LOGGER.warning("Awaiting plugwise responses %d", queue_depth) + _LOGGER.warning("Awaiting responses %d", queue_depth) + _LOGGER.warning("Requests dropped: %d", self._submit_queue.dropped_msgs) await sleep(0.125) await self._stick.write_to_stick(request) From d1a7ddf2d1b5056487d8785a30c8ea53a2c71c28 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 20:07:16 +0200 Subject: [PATCH 17/34] Bump to a2 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 66200262b..36257f9fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.8a1" +version = "0.44.8a2" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [ From 338007dfe79d3d8fae72260d1a76e28f5384dc2c Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 20:24:10 +0200 Subject: [PATCH 18/34] Revert priority-comparing, as suggested --- plugwise_usb/messages/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugwise_usb/messages/__init__.py b/plugwise_usb/messages/__init__.py index e9c9306e6..6d2755ed8 100644 --- a/plugwise_usb/messages/__init__.py +++ b/plugwise_usb/messages/__init__.py @@ -83,7 +83,7 @@ def __gt__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id < other.seq_id return self.timestamp > other.timestamp - return self.priority.value < other.priority.value + return self.priority.value > other.priority.value def __lt__(self, other: PlugwiseMessage) -> bool: """Less than.""" @@ -91,7 +91,7 @@ def __lt__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id > other.seq_id return self.timestamp < other.timestamp - return self.priority.value > other.priority.value + return self.priority.value < other.priority.value def __ge__(self, other: PlugwiseMessage) -> bool: """Greater than or equal.""" @@ -99,7 +99,7 @@ def __ge__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id < other.seq_id return self.timestamp >= other.timestamp - return self.priority.value < other.priority.value + return self.priority.value > other.priority.value def __le__(self, other: PlugwiseMessage) -> bool: """Less than or equal.""" @@ -107,4 +107,4 @@ def __le__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id <= other.seq_id return self.timestamp <= other.timestamp - return self.priority.value > other.priority.value + return self.priority.value < other.priority.value From fb4a051e3e66c80a2991282d48428a35b23e5a7f Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 20:24:52 +0200 Subject: [PATCH 19/34] Bump to a3 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 36257f9fc..90703a287 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.8a2" +version = "0.44.8a3" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [ From 72e421ed8d3494fd3a950a419d350de0e1e13331 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Thu, 17 Jul 2025 20:26:59 +0200 Subject: [PATCH 20/34] Change logger-type to warning to show the request being sent --- plugwise_usb/connection/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index bf7b47290..a79339e13 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -187,7 +187,7 @@ async def _send_queue_worker(self) -> None: _LOGGER.debug("Send_queue_worker started") while self._running and self._stick is not None: request = await self._submit_queue.get() - _LOGGER.debug("Sending from send queue %s", request) + _LOGGER.warning("Sending from send queue %s", request) if request.priority == Priority.CANCEL: self._submit_queue.task_done() return From ab6b112d6d224ee8144b6bb7051e4501c9c4453e Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 08:17:13 +0200 Subject: [PATCH 21/34] Revert "Revert priority-comparing, as suggested" This reverts commit d2cc880c67f5613b3f97453b799e60e1a5f8f01b. --- plugwise_usb/messages/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugwise_usb/messages/__init__.py b/plugwise_usb/messages/__init__.py index 6d2755ed8..e9c9306e6 100644 --- a/plugwise_usb/messages/__init__.py +++ b/plugwise_usb/messages/__init__.py @@ -83,7 +83,7 @@ def __gt__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id < other.seq_id return self.timestamp > other.timestamp - return self.priority.value > other.priority.value + return self.priority.value < other.priority.value def __lt__(self, other: PlugwiseMessage) -> bool: """Less than.""" @@ -91,7 +91,7 @@ def __lt__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id > other.seq_id return self.timestamp < other.timestamp - return self.priority.value < other.priority.value + return self.priority.value > other.priority.value def __ge__(self, other: PlugwiseMessage) -> bool: """Greater than or equal.""" @@ -99,7 +99,7 @@ def __ge__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id < other.seq_id return self.timestamp >= other.timestamp - return self.priority.value > other.priority.value + return self.priority.value < other.priority.value def __le__(self, other: PlugwiseMessage) -> bool: """Less than or equal.""" @@ -107,4 +107,4 @@ def __le__(self, other: PlugwiseMessage) -> bool: if self.seq_id is not None and other.seq_id is not None: return self.seq_id <= other.seq_id return self.timestamp <= other.timestamp - return self.priority.value < other.priority.value + return self.priority.value > other.priority.value From d0f6464e960c830252fa2bf751327f0111d3aa1b Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 08:24:28 +0200 Subject: [PATCH 22/34] Add request.priority to debug message --- plugwise_usb/connection/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index a79339e13..ccb74b646 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -187,7 +187,7 @@ async def _send_queue_worker(self) -> None: _LOGGER.debug("Send_queue_worker started") while self._running and self._stick is not None: request = await self._submit_queue.get() - _LOGGER.warning("Sending from send queue %s", request) + _LOGGER.debug("Sending from send queue %s with prio=%s", request, request.priority) if request.priority == Priority.CANCEL: self._submit_queue.task_done() return From bddd40b8a5a5c9935d23ea14a1ad6c6b60f9a5ab Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 08:36:55 +0200 Subject: [PATCH 23/34] Add more logging --- plugwise_usb/connection/queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index ccb74b646..dc80585be 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -195,6 +195,7 @@ async def _send_queue_worker(self) -> None: queue_depth = self._stick.queue_depth - self._submit_queue.dropped_msgs if queue_depth > 3: _LOGGER.warning("Awaiting responses %d", queue_depth) + _LOGGER.warning("Queue contents: %s", self._submit_queue._queue) _LOGGER.warning("Requests dropped: %d", self._submit_queue.dropped_msgs) await sleep(0.125) From b15c9b27002c9fcb369c017d558f6d8e45b395c9 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 08:37:56 +0200 Subject: [PATCH 24/34] Ruff fix --- plugwise_usb/connection/queue.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index dc80585be..eb512ec5a 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -187,7 +187,9 @@ async def _send_queue_worker(self) -> None: _LOGGER.debug("Send_queue_worker started") while self._running and self._stick is not None: request = await self._submit_queue.get() - _LOGGER.debug("Sending from send queue %s with prio=%s", request, request.priority) + _LOGGER.debug( + "Sending from send queue %s with prio=%s", request, request.priority + ) if request.priority == Priority.CANCEL: self._submit_queue.task_done() return From 236d99483c6392da9cf1069c39ebae6162f24673 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 08:38:20 +0200 Subject: [PATCH 25/34] Bump to a4 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 90703a287..4d4be239a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.8a3" +version = "0.44.8a4" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [ From 0db3eb03020223230a5a5930e1e1721ebdf3acd6 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 10:06:42 +0200 Subject: [PATCH 26/34] Add increase correction-counter when a timeout occurs --- plugwise_usb/connection/queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index eb512ec5a..8a24b8c9a 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -153,10 +153,12 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse | None: "%s, cancel because timeout is expected for NodePingRequests", exc, ) + self._stick.correct_received_messages(1) elif request.resend: _LOGGER.debug("%s, retrying", exc) else: _LOGGER.warning("%s, cancel request", exc) # type: ignore[unreachable] + self._stick.correct_received_messages(1) except StickError as exc: _LOGGER.error(exc) self._stick.correct_received_messages(1) From 22d513982d17edae042d0d5ed1054437a631e702 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 10:08:10 +0200 Subject: [PATCH 27/34] Bump to a5 --- plugwise_usb/connection/queue.py | 5 +++-- pyproject.toml | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 8a24b8c9a..499e308c3 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -197,8 +197,9 @@ async def _send_queue_worker(self) -> None: return queue_depth = self._stick.queue_depth - self._submit_queue.dropped_msgs - if queue_depth > 3: - _LOGGER.warning("Awaiting responses %d", queue_depth) + if self._submit_queue.qsize > 3: + _LOGGER.warning("Awaiting responses (qsize) %d", self._submit_queue.qsize) + _LOGGER.warning("Awaiting responses (queu_depth) %d", queue_depth) _LOGGER.warning("Queue contents: %s", self._submit_queue._queue) _LOGGER.warning("Requests dropped: %d", self._submit_queue.dropped_msgs) await sleep(0.125) diff --git a/pyproject.toml b/pyproject.toml index 4d4be239a..9cb4ccdca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.8a4" +version = "0.44.8a5" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [ From cc2ae0571eda7f2bed242b51894e34a8f17781c8 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 10:24:52 +0200 Subject: [PATCH 28/34] Use asyncio.Queue.qsize() --- plugwise_usb/connection/queue.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 499e308c3..d5cc60ec9 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -197,8 +197,10 @@ async def _send_queue_worker(self) -> None: return queue_depth = self._stick.queue_depth - self._submit_queue.dropped_msgs - if self._submit_queue.qsize > 3: - _LOGGER.warning("Awaiting responses (qsize) %d", self._submit_queue.qsize) + if self._submit_queue.qsize() > 3: + _LOGGER.warning( + "Awaiting responses (qsize) %d", self._submit_queue.qsize() + ) _LOGGER.warning("Awaiting responses (queu_depth) %d", queue_depth) _LOGGER.warning("Queue contents: %s", self._submit_queue._queue) _LOGGER.warning("Requests dropped: %d", self._submit_queue.dropped_msgs) From 876769fcbc58b9fab97fd56ce9586ebf8e8fdd32 Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 10:28:16 +0200 Subject: [PATCH 29/34] Bump to a6 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9cb4ccdca..2582fcbe3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.8a5" +version = "0.44.8a6" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [ From a6a94c33ec1038e2ab061375b4648cf2ce242eec Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 12:13:01 +0200 Subject: [PATCH 30/34] Remove use of queue_depth using queue.qsize() is more accurate --- plugwise_usb/connection/manager.py | 5 ----- plugwise_usb/connection/queue.py | 2 -- 2 files changed, 7 deletions(-) diff --git a/plugwise_usb/connection/manager.py b/plugwise_usb/connection/manager.py index dbba049ef..5c8cb736d 100644 --- a/plugwise_usb/connection/manager.py +++ b/plugwise_usb/connection/manager.py @@ -36,11 +36,6 @@ def __init__(self) -> None: ] = {} self._unsubscribe_stick_events: Callable[[], None] | None = None - @property - def queue_depth(self) -> int: - """Return estimated size of pending responses.""" - return self._sender.processed_messages - self._receiver.processed_messages - def correct_received_messages(self, correction: int) -> None: """Correct received messages count.""" self._receiver.correct_processed_messages(correction) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index d5cc60ec9..24b4e02e2 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -196,12 +196,10 @@ async def _send_queue_worker(self) -> None: self._submit_queue.task_done() return - queue_depth = self._stick.queue_depth - self._submit_queue.dropped_msgs if self._submit_queue.qsize() > 3: _LOGGER.warning( "Awaiting responses (qsize) %d", self._submit_queue.qsize() ) - _LOGGER.warning("Awaiting responses (queu_depth) %d", queue_depth) _LOGGER.warning("Queue contents: %s", self._submit_queue._queue) _LOGGER.warning("Requests dropped: %d", self._submit_queue.dropped_msgs) await sleep(0.125) From 97cad7b4083c586ce0302110af980f70ac414dac Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 13:02:50 +0200 Subject: [PATCH 31/34] Remove all request/response counter related --- plugwise_usb/connection/manager.py | 4 ---- plugwise_usb/connection/queue.py | 4 ---- plugwise_usb/connection/receiver.py | 11 ----------- plugwise_usb/connection/sender.py | 7 ------- 4 files changed, 26 deletions(-) diff --git a/plugwise_usb/connection/manager.py b/plugwise_usb/connection/manager.py index 5c8cb736d..7dea480be 100644 --- a/plugwise_usb/connection/manager.py +++ b/plugwise_usb/connection/manager.py @@ -36,10 +36,6 @@ def __init__(self) -> None: ] = {} self._unsubscribe_stick_events: Callable[[], None] | None = None - def correct_received_messages(self, correction: int) -> None: - """Correct received messages count.""" - self._receiver.correct_processed_messages(correction) - @property def serial_path(self) -> str: """Return current port.""" diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 24b4e02e2..9d58d5ed4 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -153,21 +153,17 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse | None: "%s, cancel because timeout is expected for NodePingRequests", exc, ) - self._stick.correct_received_messages(1) elif request.resend: _LOGGER.debug("%s, retrying", exc) else: _LOGGER.warning("%s, cancel request", exc) # type: ignore[unreachable] - self._stick.correct_received_messages(1) except StickError as exc: _LOGGER.error(exc) - self._stick.correct_received_messages(1) raise StickError( f"No response received for {request.__class__.__name__} " + f"to {request.mac_decoded}" ) from exc except BaseException as exc: - self._stick.correct_received_messages(1) raise StickError( f"No response received for {request.__class__.__name__} " + f"to {request.mac_decoded}" diff --git a/plugwise_usb/connection/receiver.py b/plugwise_usb/connection/receiver.py index 5caf56266..4651dab6f 100644 --- a/plugwise_usb/connection/receiver.py +++ b/plugwise_usb/connection/receiver.py @@ -99,7 +99,6 @@ def __init__( self._data_worker_task: Task[None] | None = None # Message processing - self._processed_msgs = 0 self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue() self._last_processed_messages: list[bytes] = [] self._current_seq_id: bytes | None = None @@ -138,20 +137,11 @@ def connection_lost(self, exc: Exception | None = None) -> None: self._transport = None self._connection_state = False - @property - def processed_messages(self) -> int: - """Return the number of processed messages.""" - return self._processed_msgs - @property def is_connected(self) -> bool: """Return current connection state of the USB-Stick.""" return self._connection_state - def correct_processed_messages(self, correction: int) -> None: - """Correct the number of processed messages.""" - self._processed_msgs += correction - def connection_made(self, transport: SerialTransport) -> None: """Call when the serial connection to USB-Stick is established.""" _LOGGER.info("Connection made") @@ -291,7 +281,6 @@ async def _message_queue_worker(self) -> None: await self._notify_stick_subscribers(response) else: await self._notify_node_response_subscribers(response) - self._processed_msgs += 1 self._message_queue.task_done() await sleep(0) _LOGGER.debug("Message queue worker stopped") diff --git a/plugwise_usb/connection/sender.py b/plugwise_usb/connection/sender.py index f12709849..36030f13e 100644 --- a/plugwise_usb/connection/sender.py +++ b/plugwise_usb/connection/sender.py @@ -38,17 +38,11 @@ def __init__(self, stick_receiver: StickReceiver, transport: Transport) -> None: self._loop = get_running_loop() self._receiver = stick_receiver self._transport = transport - self._processed_msgs = 0 self._stick_response: Future[StickResponse] | None = None self._stick_lock = Lock() self._current_request: None | PlugwiseRequest = None self._unsubscribe_stick_response: Callable[[], None] | None = None - @property - def processed_messages(self) -> int: - """Return the number of processed messages.""" - return self._processed_msgs - async def start(self) -> None: """Start the sender.""" # Subscribe to ACCEPT stick responses, which contain the seq_id we need. @@ -149,7 +143,6 @@ async def write_request_to_port(self, request: PlugwiseRequest) -> None: finally: self._stick_response.cancel() self._stick_lock.release() - self._processed_msgs += 1 async def _process_stick_response(self, response: StickResponse) -> None: """Process stick response.""" From 70bc4a020c65df717fc3e50d1c17118c6023a66f Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 13:15:12 +0200 Subject: [PATCH 32/34] Add comment about queue rate limiting --- plugwise_usb/connection/queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugwise_usb/connection/queue.py b/plugwise_usb/connection/queue.py index 9d58d5ed4..0c051bf19 100644 --- a/plugwise_usb/connection/queue.py +++ b/plugwise_usb/connection/queue.py @@ -198,6 +198,7 @@ async def _send_queue_worker(self) -> None: ) _LOGGER.warning("Queue contents: %s", self._submit_queue._queue) _LOGGER.warning("Requests dropped: %d", self._submit_queue.dropped_msgs) + # When the queue size grows, rate-limit the sending of requests to avoid overloading the network await sleep(0.125) await self._stick.write_to_stick(request) From f0ef47e7ae50197050acca9bd7ccfc1b18993d4c Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 13:08:04 +0200 Subject: [PATCH 33/34] Test: fix missing await by removing double line --- tests/test_usb.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_usb.py b/tests/test_usb.py index 04063deb3..187d24cbf 100644 --- a/tests/test_usb.py +++ b/tests/test_usb.py @@ -1691,7 +1691,6 @@ async def makedirs(cache_dir: str, exist_ok: bool) -> None: "FEDCBA9876543210": pw_api.NodeType.CIRCLE, "1298347650AFBECD": pw_api.NodeType.SCAN, } - pw_nw_cache.update_nodetypes("1234ABCD4321FEDC", pw_api.NodeType.STEALTH) with patch("aiofiles.threadpool.sync_open", return_value=mock_file_stream): # await pw_nw_cache.save_cache() From 0d844a9ff4a5e8ca8c097d43a5df63844e60acff Mon Sep 17 00:00:00 2001 From: Bouwe Westerdijk Date: Fri, 18 Jul 2025 13:31:23 +0200 Subject: [PATCH 34/34] Bump to a7 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2582fcbe3..c17c32ac0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.8a6" +version = "0.44.8a7" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [