Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugwise_usb/connection/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def __init__(self) -> None:
] = {}
self._unsubscribe_stick_events: Callable[[], None] | None = None

@property
def queue_depth(self) -> int:
return self._sender.processed_messages - self._receiver.processed_messages

def correct_received_messages(self, correction: int) -> None:
self._receiver.correct_processed_messages(correction)

@property
def serial_path(self) -> str:
"""Return current port."""
Expand Down
16 changes: 12 additions & 4 deletions plugwise_usb/connection/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@
self._stick = None
_LOGGER.debug("queue stopped")

async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse | None:
"""Add request to queue and return the response of node. Raises an error when something fails."""
if request.waiting_for_response:
raise MessageError(
f"Cannot send message {request} which is currently waiting for response."
)

while request.resend and not request.waiting_for_response:
_LOGGER.warning("submit | start (%s) %s", request.retries_left, request)
_LOGGER.debug("submit | start (%s) %s", request.retries_left, request)
if not self._running or self._stick is None:
raise StickError(
f"Cannot send message {request.__class__.__name__} for"
Expand All @@ -91,6 +91,7 @@
await self._add_request_to_queue(request)
try:
response: PlugwiseResponse = await request.response_future()
return response
except (NodeTimeout, StickTimeout) as e:
if isinstance(request, NodePingRequest):
# For ping requests it is expected to receive timeouts, so lower log level
Expand All @@ -103,17 +104,19 @@
_LOGGER.warning("%s, cancel request", e) # type: ignore[unreachable]
except StickError as exception:
_LOGGER.error(exception)
self._stick.correct_received_messages(1)

Check warning on line 107 in plugwise_usb/connection/queue.py

View check run for this annotation

Codecov / codecov/patch

plugwise_usb/connection/queue.py#L107

Added line #L107 was not covered by tests
raise StickError(
f"No response received for {request.__class__.__name__} "
+ f"to {request.mac_decoded}"
) from exception
except BaseException as exception:
self._stick.correct_received_messages(1)
raise StickError(
f"No response received for {request.__class__.__name__} "
+ f"to {request.mac_decoded}"
) from exception

return response
return None

Check warning on line 119 in plugwise_usb/connection/queue.py

View check run for this annotation

Codecov / codecov/patch

plugwise_usb/connection/queue.py#L119

Added line #L119 was not covered by tests

async def _add_request_to_queue(self, request: PlugwiseRequest) -> None:
"""Add request to send queue."""
Expand All @@ -133,8 +136,13 @@
if request.priority == Priority.CANCEL:
self._submit_queue.task_done()
return

while self._stick.queue_depth > 3:
_LOGGER.info("Awaiting plugwise responses %d", self._stick.queue_depth)
await sleep(0.125)

await self._stick.write_to_stick(request)
self._submit_queue.task_done()
await sleep(0.001)

_LOGGER.debug("Sent from queue %s", request)
_LOGGER.debug("Send_queue_worker stopped")
13 changes: 12 additions & 1 deletion plugwise_usb/connection/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(
self._data_worker_task: Task[None] | None = None

# Message processing
self._processed_msgs = 0
self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue()
self._last_processed_messages: list[bytes] = []
self._current_seq_id: bytes | None = None
Expand Down Expand Up @@ -137,11 +138,20 @@ def connection_lost(self, exc: Exception | None = None) -> None:
self._transport = None
self._connection_state = False

@property
def processed_messages(self) -> int:
"""Return the number of processed messages."""
return self._processed_msgs

@property
def is_connected(self) -> bool:
"""Return current connection state of the USB-Stick."""
return self._connection_state

def correct_processed_messages(self, correction: int) -> None:
"""Return the number of processed messages."""
self._processed_msgs += correction

def connection_made(self, transport: SerialTransport) -> None:
"""Call when the serial connection to USB-Stick is established."""
_LOGGER.info("Connection made")
Expand Down Expand Up @@ -278,6 +288,7 @@ async def _message_queue_worker(self) -> None:
await self._notify_stick_subscribers(response)
else:
await self._notify_node_response_subscribers(response)
self._processed_msgs += 1
self._message_queue.task_done()
await sleep(0)
_LOGGER.debug("Message queue worker stopped")
Expand Down Expand Up @@ -457,7 +468,7 @@ async def _notify_node_response_subscribers(

self._node_subscription_lock.release()
if len(notify_tasks) > 0:
_LOGGER.debug("Received %s", node_response)
_LOGGER.debug("Received %s %s", node_response, node_response.seq_id)
if node_response.seq_id not in BROADCAST_IDS:
self._last_processed_messages.append(node_response.seq_id)
# Limit tracking to only the last appended request (FIFO)
Expand Down
7 changes: 7 additions & 0 deletions plugwise_usb/connection/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ def __init__(self, stick_receiver: StickReceiver, transport: Transport) -> None:
self._loop = get_running_loop()
self._receiver = stick_receiver
self._transport = transport
self._processed_msgs = 0
self._stick_response: Future[StickResponse] | None = None
self._stick_lock = Lock()
self._current_request: None | PlugwiseRequest = None
self._unsubscribe_stick_response: Callable[[], None] | None = None

@property
def processed_messages(self) -> int:
"""Return the number of processed messages."""
return self._processed_msgs

async def start(self) -> None:
"""Start the sender."""
# Subscribe to ACCEPT stick responses, which contain the seq_id we need.
Expand Down Expand Up @@ -133,6 +139,7 @@ async def write_request_to_port(self, request: PlugwiseRequest) -> None:
finally:
self._stick_response.cancel()
self._stick_lock.release()
self._processed_msgs += 1

async def _process_stick_response(self, response: StickResponse) -> None:
"""Process stick response."""
Expand Down
6 changes: 2 additions & 4 deletions plugwise_usb/helpers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ async def initialize_cache(self, create_root_folder: bool = False) -> None:
cache_dir = self._get_writable_os_dir()
await makedirs(cache_dir, exist_ok=True)
self._cache_path = cache_dir
if os_name == "nt":
self._cache_file = f"{cache_dir}\\{self._file_name}"
else:
self._cache_file = f"{cache_dir}/{self._file_name}"

self._cache_file = os_path_join(self._cache_path, self._file_name)
self._cache_file_exists = await ospath.exists(self._cache_file)
self._initialized = True
_LOGGER.debug("Start using network cache file: %s", self._cache_file)
Expand Down
18 changes: 7 additions & 11 deletions plugwise_usb/nodes/circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from asyncio import Task, create_task, gather
from asyncio import Task, create_task
from collections.abc import Awaitable, Callable
from dataclasses import replace
from datetime import UTC, datetime
Expand Down Expand Up @@ -453,11 +453,8 @@
log_address, _ = calc_log_address(log_address, 1, -4)
total_addresses -= 1

if not all(await gather(*log_update_tasks)):
_LOGGER.info(
"Failed to request one or more update energy log for %s",
self._mac_in_str,
)
for task in log_update_tasks:
await task

Check warning on line 457 in plugwise_usb/nodes/circle.py

View check run for this annotation

Codecov / codecov/patch

plugwise_usb/nodes/circle.py#L456-L457

Added lines #L456 - L457 were not covered by tests

if self._cache_enabled:
await self._energy_log_records_save_to_cache()
Expand All @@ -475,9 +472,8 @@
)

missing_addresses = sorted(missing_addresses, reverse=True)
await gather(
*[self.energy_log_update(address) for address in missing_addresses]
)
for address in missing_addresses:
await self.energy_log_update(address)

Check warning on line 476 in plugwise_usb/nodes/circle.py

View check run for this annotation

Codecov / codecov/patch

plugwise_usb/nodes/circle.py#L475-L476

Added lines #L475 - L476 were not covered by tests

if self._cache_enabled:
await self._energy_log_records_save_to_cache()
Expand Down Expand Up @@ -528,7 +524,7 @@
"""Load energy_log_record from cache."""
cache_data = self._get_cache(CACHE_ENERGY_COLLECTION)
if (cache_data := self._get_cache(CACHE_ENERGY_COLLECTION)) is None:
_LOGGER.debug(
_LOGGER.warning(

Check warning on line 527 in plugwise_usb/nodes/circle.py

View check run for this annotation

Codecov / codecov/patch

plugwise_usb/nodes/circle.py#L527

Added line #L527 was not covered by tests
"Failed to restore energy log records from cache for node %s", self.name
)
return False
Expand Down Expand Up @@ -811,7 +807,7 @@
return False
# Energy collection
if await self._energy_log_records_load_from_cache():
_LOGGER.debug(
_LOGGER.warning(

Check warning on line 810 in plugwise_usb/nodes/circle.py

View check run for this annotation

Codecov / codecov/patch

plugwise_usb/nodes/circle.py#L810

Added line #L810 was not covered by tests
"Node %s failed to load energy_log_records from cache",
self._mac_in_str,
)
Expand Down
10 changes: 6 additions & 4 deletions plugwise_usb/nodes/helpers/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dataclasses import dataclass
from typing import Any


from ...api import NodeFeature


Expand All @@ -20,11 +21,12 @@ class NodeFeatureSubscription:

class FeaturePublisher:
"""Base Class to call awaitable of subscription when event happens."""
def __init__(self) -> None:
self._feature_update_subscribers: dict[
Callable[[], None],
NodeFeatureSubscription,
] = {}

_feature_update_subscribers: dict[
Callable[[], None],
NodeFeatureSubscription,
] = {}

def subscribe_to_feature_update(
self,
Expand Down
4 changes: 3 additions & 1 deletion plugwise_usb/nodes/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
loaded_callback: Callable[[NodeEvent, str], Awaitable[None]],
):
"""Initialize Plugwise base node class."""
super().__init__()
self._loaded_callback = loaded_callback
self._message_subscribe = controller.subscribe_to_messages
self._features: tuple[NodeFeature, ...] = NODE_FEATURES
Expand Down Expand Up @@ -415,7 +416,8 @@ async def _available_update_state(
if (
self._last_seen is not None
and timestamp is not None
and self._last_seen < timestamp
and (timestamp - self._last_seen).seconds > 5

):
self._last_seen = timestamp
await self.publish_feature_update_to_subscribers(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ async def test_node_relay_and_power(self, monkeypatch: pytest.MonkeyPatch) -> No
assert await stick.nodes["2222222222222222"].load()
self.test_init_relay_state_on = asyncio.Future()
self.test_init_relay_state_off = asyncio.Future()
unsub_inti_relay = stick.nodes["0098765432101234"].subscribe_to_feature_update(
unsub_inti_relay = stick.nodes["2222222222222222"].subscribe_to_feature_update(
node_feature_callback=self.node_init_relay_state,
features=(pw_api.NodeFeature.RELAY_INIT,),
)
Expand Down
Loading