Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deebot_client/commands/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
GetCachedMapInfo,
GetMajorMap,
GetMapSet,
GetMapSetV2,
GetMapSubSet,
GetMapTrace,
GetMinorMap,
Expand Down Expand Up @@ -73,6 +74,7 @@
"GetCachedMapInfo",
"GetMajorMap",
"GetMapSet",
"GetMapSetV2",
"GetMapSubSet",
"GetMapTrace",
"GetMinorMap",
Expand Down Expand Up @@ -144,6 +146,7 @@
GetCachedMapInfo,
GetMajorMap,
GetMapSet,
GetMapSetV2,
GetMapSubSet,
GetMapTrace,
GetMinorMap,
Expand Down
108 changes: 94 additions & 14 deletions deebot_client/commands/json/map.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Maps commands."""
from __future__ import annotations

import json
from types import MappingProxyType
from typing import TYPE_CHECKING, Any

Expand All @@ -15,6 +16,7 @@
)
from deebot_client.events.map import CachedMapInfoEvent
from deebot_client.message import HandlingResult, HandlingState, MessageBodyDataDict
from deebot_client.util import decompress_7z_base64_data

from .common import JsonCommandWithMessageHandling

Expand All @@ -26,6 +28,22 @@ class GetCachedMapInfo(JsonCommandWithMessageHandling, MessageBodyDataDict):
"""Get cached map info command."""

name = "getCachedMapInfo"
# version definition for using type of getMapSet v1 or v2
_map_set_command: type[GetMapSet | GetMapSetV2]

def __init__(
self, args: dict[str, Any] | list[Any] | None = None, version: int = 1
) -> None:
match version:
case 1:
self._map_set_command = GetMapSet
case 2:
self._map_set_command = GetMapSetV2
case _:
error_wrong_version = f"version={version} is not supported"
raise ValueError(error_wrong_version)

super().__init__(args)

@classmethod
def _handle_body_data_dict(
Expand Down Expand Up @@ -59,7 +77,10 @@ def _handle_response(
return CommandResult(
result.state,
result.args,
[GetMapSet(result.args["map_id"], entry) for entry in MapSetType],
[
self._map_set_command(result.args["map_id"], entry)
for entry in MapSetType
],
)

return result
Expand Down Expand Up @@ -131,16 +152,24 @@ def _handle_body_data_dict(

:return: A message response
"""
subsets = [int(subset["mssid"]) for subset in data["subsets"]]
args = {
cls._ARGS_ID: data["mid"],
cls._ARGS_SET_ID: data.get("msid", None),
cls._ARGS_TYPE: data["type"],
cls._ARGS_SUBSETS: subsets,
}
if not MapSetType.has_value(data["type"]) or not data.get("subsets"):
return HandlingResult.analyse()

event_bus.notify(MapSetEvent(MapSetType(data["type"]), subsets))
return HandlingResult(HandlingState.SUCCESS, args)
if subset_ids := cls._get_subset_ids(event_bus, data):
event_bus.notify(MapSetEvent(MapSetType(data["type"]), subset_ids))
args = {
cls._ARGS_ID: data["mid"],
cls._ARGS_SET_ID: data.get("msid"),
cls._ARGS_TYPE: data["type"],
cls._ARGS_SUBSETS: subset_ids,
}
return HandlingResult(HandlingState.SUCCESS, args)
return HandlingResult(HandlingState.SUCCESS)

@classmethod
def _get_subset_ids(cls, _: EventBus, data: dict[str, Any]) -> list[int] | None:
"""Return subset ids."""
return [int(subset["mssid"]) for subset in data["subsets"]]

def _handle_response(
self, event_bus: EventBus, response: dict[str, Any]
Expand Down Expand Up @@ -205,7 +234,8 @@ def __init__(
type = type.value

if msid is None and type == MapSetType.ROOMS.value:
raise ValueError("msid is required when type='vw'")
error_msid_type = f"msid is required when type='{MapSetType.ROOMS.value}'"
raise ValueError(error_msid_type)

super().__init__(
{
Expand All @@ -225,18 +255,26 @@ def _handle_body_data_dict(
:return: A message response
"""
if MapSetType.has_value(data["type"]):
subtype = data.get("subtype", data.get("subType", None))
subtype = data.get("subtype", data.get("subType"))
name = None
if subtype == "15":
name = data.get("name", None)
name = data.get("name")
elif subtype:
name = cls._ROOM_NUM_TO_NAME.get(subtype, None)

# This command is used by new and old bots
if data.get("compress", 0) == 1:
# Newer bot's return coordinates as base64 decoded string
coordinates = decompress_7z_base64_data(data["value"]).decode()
else:
# Older bot's return coordinates direct as comma/semicolon separated list
coordinates = data["value"]

event_bus.notify(
MapSubsetEvent(
id=int(data["mssid"]),
type=MapSetType(data["type"]),
coordinates=data["value"],
coordinates=coordinates,
name=name,
)
)
Expand All @@ -246,6 +284,48 @@ def _handle_body_data_dict(
return HandlingResult.analyse()


class GetMapSetV2(GetMapSet):
"""Get map set v2 command."""

name = "getMapSet_V2"

@classmethod
def _get_subset_ids(
cls, event_bus: EventBus, data: dict[str, Any]
) -> list[int] | None:
"""Return subset ids."""
# subset is based64 7z compressed
subsets = json.loads(decompress_7z_base64_data(data["subsets"]).decode())

match data["type"]:
case MapSetType.ROOMS:
# subset values
# 1 -> id
# 2 -> unknown
# 3 -> unknown
# 4 -> room clean order
# 5 -> room center x
# 6 -> room center y
# 7 -> room clean configs as '<count>-<speed>-<water>'
# 8 -> named all as 'settingName1'
return [int(subset[0]) for subset in subsets]

case MapSetType.VIRTUAL_WALLS | MapSetType.NO_MOP_ZONES:
for subset in subsets:
mssid = subset[0] # first entry in list is mssid
coordinates = str(subset[1:]) # all other in list are coordinates

event_bus.notify(
MapSubsetEvent(
id=int(mssid),
type=MapSetType(data["type"]),
coordinates=coordinates,
)
)

return None


class GetMapTrace(JsonCommandWithMessageHandling, MessageBodyDataDict):
"""Get map trace command."""

Expand Down
8 changes: 4 additions & 4 deletions deebot_client/hardware/deebot/p95mgv.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
from deebot_client.commands.json.error import GetError
from deebot_client.commands.json.fan_speed import GetFanSpeed, SetFanSpeed
from deebot_client.commands.json.life_span import GetLifeSpan, ResetLifeSpan

# getMapSet
from deebot_client.commands.json.map import (
GetCachedMapInfo,
GetMajorMap,
Expand Down Expand Up @@ -160,15 +158,17 @@
reset=ResetLifeSpan,
),
map=CapabilityMap(
chached_info=CapabilityEvent(CachedMapInfoEvent, [GetCachedMapInfo()]),
chached_info=CapabilityEvent(
CachedMapInfoEvent, [GetCachedMapInfo(version=2)]
),
changed=CapabilityEvent(MapChangedEvent, []),
major=CapabilityEvent(MajorMapEvent, [GetMajorMap()]),
multi_state=CapabilitySetEnable(
MultimapStateEvent, [GetMultimapState()], SetMultimapState
),
position=CapabilityEvent(PositionsEvent, [GetPos()]),
relocation=CapabilityExecute(SetRelocationState),
rooms=CapabilityEvent(RoomsEvent, [GetCachedMapInfo()]),
rooms=CapabilityEvent(RoomsEvent, [GetCachedMapInfo(version=2)]),
trace=CapabilityEvent(MapTraceEvent, [GetMapTrace()]),
),
network=CapabilityEvent(NetworkInfoEvent, [GetNetInfo()]),
Expand Down
36 changes: 9 additions & 27 deletions deebot_client/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from decimal import Decimal
from io import BytesIO
import itertools
import lzma
import struct
from typing import TYPE_CHECKING, Any, Final
import zlib
Expand All @@ -35,7 +34,11 @@
from .exceptions import MapError
from .logging_filter import get_logger
from .models import Room
from .util import OnChangedDict, OnChangedList
from .util import (
OnChangedDict,
OnChangedList,
decompress_7z_base64_data,
)

if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Sequence
Expand Down Expand Up @@ -117,7 +120,7 @@ class _PositionSvg:
_TRACE_MAP = "trace_map"
_COLORS = {
_TRACE_MAP: "#fff",
MapSetType.VIRTUAL_WALLS: "#f00",
MapSetType.VIRTUAL_WALLS: "#f00000",
MapSetType.NO_MOP_ZONES: "#ffa500",
}
_DEFAULT_MAP_BACKGROUND_COLOR = ImageColor.getrgb("#badaff") # floor
Expand Down Expand Up @@ -233,27 +236,6 @@ class BackgroundImage:
)


def _decompress_7z_base64_data(data: str) -> bytes:
_LOGGER.debug("[decompress7zBase64Data] Begin")
final_array = bytearray()

# Decode Base64
decoded = base64.b64decode(data)

i = 0
for idx in decoded:
if i == 8:
final_array += b"\x00\x00\x00\x00"
final_array.append(idx)
i += 1

dec = lzma.LZMADecompressor(lzma.FORMAT_AUTO, None, None)
decompressed_data = dec.decompress(final_array)

_LOGGER.debug("[decompress7zBase64Data] Done")
return decompressed_data


def _calc_value(value: float, axis_manipulation: AxisManipulation) -> float:
try:
if value is not None:
Expand Down Expand Up @@ -352,7 +334,7 @@ def _get_svg_subset(

# For any other points count, return a polygon that should fit any required shape
return svg.Polygon(
fill=_COLORS[subset.type] + "90", # Set alpha channel to 90 for fill color
fill=_COLORS[subset.type] + "30", # Set alpha channel to 30 for fill color
stroke=_COLORS[subset.type],
stroke_width=1.5,
stroke_dasharray=[4],
Expand Down Expand Up @@ -432,7 +414,7 @@ async def on_map_subset(event: MapSubsetEvent) -> None:

def _update_trace_points(self, data: str) -> None:
_LOGGER.debug("[_update_trace_points] Begin")
trace_points = _decompress_7z_base64_data(data)
trace_points = decompress_7z_base64_data(data)

for i in range(0, len(trace_points), 5):
position_x, position_y = struct.unpack("<hh", trace_points[i : i + 4])
Expand Down Expand Up @@ -683,7 +665,7 @@ def image(self) -> Image.Image:

def update_points(self, base64_data: str) -> None:
"""Add map piece points."""
decoded = _decompress_7z_base64_data(base64_data)
decoded = decompress_7z_base64_data(base64_data)
old_crc32 = self._crc32
self._crc32 = zlib.crc32(decoded)

Expand Down
8 changes: 7 additions & 1 deletion deebot_client/messages/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
from typing import TYPE_CHECKING

from .battery import OnBattery
from .map import OnMapSetV2
from .stats import ReportStats

if TYPE_CHECKING:
from deebot_client.message import Message

__all__ = ["OnBattery", "ReportStats"]
__all__ = [
"OnBattery",
"OnMapSetV2",
"ReportStats",
]

# fmt: off
# ordered by file asc
_MESSAGES: list[type[Message]] = [
OnBattery,
OnMapSetV2,

ReportStats
]
Expand Down
35 changes: 35 additions & 0 deletions deebot_client/messages/json/map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Map set v2 messages."""
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from deebot_client.events.map import MapSetType
from deebot_client.message import HandlingResult, HandlingState, MessageBodyDataDict

if TYPE_CHECKING:
from deebot_client.event_bus import EventBus


class OnMapSetV2(MessageBodyDataDict):
"""On map set v2 message."""

name = "onMapSet_V2"

@classmethod
def _handle_body_data_dict(
cls, _: EventBus, data: dict[str, Any]
) -> HandlingResult:
"""Handle message->body->data and notify the correct event subscribers.

:return: A message response
"""
# check if type is know and mid us given
if not MapSetType.has_value(data["type"]) or not data.get("mid"):
return HandlingResult.analyse()

# NOTE: here would be needed to call 'GetMapSetV2' again with 'mid' and 'type',
# that on event will update the map set changes,
# messages current cannot call commands again
return HandlingResult(
HandlingState.SUCCESS, {"mid": data["mid"], "type": data["type"]}
)
Loading