Skip to content

Commit

Permalink
Support for multi-level vacuum firmware (#93)
Browse files Browse the repository at this point in the history
* Added complete protobuf definition

* Added support for listing maps and setting current map

Co-authored-by: Shaun Tarves <[email protected]>
  • Loading branch information
shauntarves authored Oct 5, 2022
1 parent 5fe8db8 commit 233d043
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 35 deletions.
24 changes: 23 additions & 1 deletion wyze_sdk/api/devices/vacuums.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from wyze_sdk.api.base import BaseClient
from wyze_sdk.models.devices import DeviceModels, Vacuum, VacuumSuctionLevel
from wyze_sdk.models.devices.vacuums import VacuumSweepRecord
from wyze_sdk.models.devices.vacuums import VacuumMapSummary, VacuumSweepRecord
from wyze_sdk.service import VenusServiceClient, WyzeResponse


Expand Down Expand Up @@ -104,6 +104,28 @@ def get_sweep_records(self, *, device_mac: str, limit: int = 20, since: datetime
"""
return [VacuumSweepRecord(**record) for record in super()._venus_client().get_sweep_records(did=device_mac, keys=[], limit=limit, since=since)["data"]["data"]]

def get_maps(self, *, device_mac: str, **kwargs) -> Sequence[VacuumMapSummary]:
"""Retrieves defined maps for a vacuum.
:param str device_mac: The device mac. e.g. ``JA_RO2_ABCDEF1234567890``
:rtype: Sequence[VacuumMapSummary]
"""
return [VacuumMapSummary(**map) for map in super()._venus_client().get_maps(did=device_mac)["data"]]

def set_current_map(self, *, device_mac: str, map_id: int, **kwargs) -> WyzeResponse:
"""Sets the current map of a vacuum.
Args:
:param str device_mac: The device mac. e.g. ``JA_RO2_ABCDEF1234567890``
:param int map_id: The new current map id. e.g. ``12345678``
:rtype: WyzeResponse
"""

return super()._venus_client().set_current_map(
did=device_mac, map_id=map_id)

def set_suction_level(self, *, device_mac: str, device_model: str, suction_level: VacuumSuctionLevel, **kwargs) -> WyzeResponse:
"""Sets the suction level of a vacuum.
Expand Down
237 changes: 203 additions & 34 deletions wyze_sdk/models/devices/vacuums.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,23 @@ def __init__(
name_position: VacuumMapPoint = None,
**others: dict
):
self._id = id if id else int(self._extract_attribute('id', others))
self._name = name if name else self._extract_attribute('name', others)
self._clean_state = clean_state if clean_state else int(self._extract_attribute('clean_state', others))
self._room_clean = room_clean if room_clean else int(self._extract_attribute('room_clean', others))
self._name_position = name_position if name_position else VacuumMapPoint(**self._extract_attribute('roomNamePost_', others))
self._id = id if id else int(self._extract_attribute('roomId_', others))
self._name = name if name else self._extract_attribute('roomName_', others)
if not clean_state:
clean_state = self._extract_attribute('cleanState_', others)
if clean_state:
clean_state = int(clean_state)
self._clean_state = clean_state
if not room_clean:
room_clean = self._extract_attribute('roomClean_', others)
if room_clean:
room_clean = int(room_clean)
self._room_clean = room_clean
if not name_position:
name_position = self._extract_attribute('roomNamePost_', others)
if name_position:
name_position = VacuumMapPoint(**name_position)
self._name_position = name_position
show_unknown_key_warning(self, others)

@property
Expand All @@ -197,15 +209,15 @@ def name(self) -> str:
return self._name

@property
def clean_state(self) -> int:
def clean_state(self) -> Optional[int]:
return self._clean_state

@property
def room_clean(self) -> int:
def room_clean(self) -> Optional[int]:
return self._room_clean

@property
def name_position(self) -> VacuumMapPoint:
def name_position(self) -> Optional[VacuumMapPoint]:
return self._name_position


Expand All @@ -217,11 +229,18 @@ class VacuumMap(JsonObject):
@classmethod
def _robot_map_proto(cls) -> dict:
return {
# Java type int
# 0 == REAL_TIME
# 1 == POINT
# 2 == AREA
# 3 == MEMORY
'1': {'type': 'int', 'name': 'mapType_'},
# Mapped from MapExtInfo to VenusMapExtraTimeBean
'2': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'taskBeginDate_'},
'2': {'type': 'int', 'name': 'mapUploadDate_'}
}, 'name': 'mapExtInfo_'},
# Mapped from MapHeadInfo to VenusMapHeadBean
'3': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'mapHeadId_'},
'2': {'type': 'int', 'name': 'sizeX_'},
Expand All @@ -232,72 +251,154 @@ def _robot_map_proto(cls) -> dict:
'7': {'type': 'float', 'name': 'maxY_'},
'8': {'type': 'float', 'name': 'resolution_'}
}, 'name': 'mapHeadInfo_'},
# Mapped from MapDataInfo to VenusMapContentBean
'4': {'type': 'message', 'message_typedef': {
'1': {'type': 'bytes', 'name': 'mapData_'}
}, 'name': 'mapData_'},
# Mapped from List<AllMapInfo> to List<VenusMapIdAndNameBean>
'5': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'mapHeadId_'},
'2': {'type': 'bytes', 'name': 'mapName_'}
}, 'name': 'mapInfo_'},
}, 'name': ''}, # mapInfo_
# Mapped from DeviceHistoryPoseInfo to VenusDeviceHistoryPoseBean
'6': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'poseId_'},
# Mapped from DeviceCoverPointDataInfo to VenusDeviceCoverPointBean
'2': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'update_'},
'2': {'type': 'float', 'name': 'x_'},
'3': {'type': 'float', 'name': 'y_'}
}, 'name': 'points_'},
'3': {'type': 'int', 'name': 'pathType_'}
}, 'name': ''},
}, 'name': 'historyPose_'},
# Mapped from DevicePoseDataInfo to VenusChargingPilePositionBean
'7': {'type': 'message', 'message_typedef': {
'1': {'type': 'float', 'name': 'x_'},
'2': {'type': 'float', 'name': 'y_'},
'3': {'type': 'float', 'name': 'phi_'}
}, 'name': 'chargeStation_'},
# Mapped from DeviceCurrentPoseInfo to VenusDeviceCurrentPositionBean
# currentPose_ only present when unit is active
'8': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'poseId_'},
'2': {'type': 'int', 'name': 'update_'},
'3': {'type': 'float', 'name': 'x_'},
'4': {'type': 'float', 'name': 'y_'},
'5': {'type': 'float', 'name': 'phi_'}
}, 'name': 'currentPose_'},
# 9: virtualWalls
# 10: areasInfo
# Mapped from List<DeviceAreaDataInfo> to List<VenusDeviceAreaBean>
'9': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'status_'},
'2': {'type': 'int', 'name': 'type_'},
'3': {'type': 'int', 'name': 'areaIndex_'},
# Mapped from List<DevicePointInfo> to List<VenusDeviceAreaBean.RoomPoint>
'4': {'type': 'message', 'message_typedef': {
'1': {'type': 'float', 'name': 'x_'},
'2': {'type': 'float', 'name': 'y_'}
}, 'name': 'points_'},
}, 'name': 'virtualWalls_'},
# Mapped from List<DeviceAreaDataInfo> to List<VenusDeviceAreaBean>
'10': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'status_'},
'2': {'type': 'int', 'name': 'type_'},
'3': {'type': 'int', 'name': 'areaIndex_'},
# Mapped from List<DevicePointInfo> to List<VenusDeviceAreaBean.RoomPoint>
'4': {'type': 'message', 'message_typedef': {
'1': {'type': 'float', 'name': 'x_'},
'2': {'type': 'float', 'name': 'y_'}
}, 'name': 'points_'},
}, 'name': 'areasInfo_'},
# Mapped from List<DeviceNavigationPointDataInfo> to List<VenusDeviceNavigationPointBean>
# navigationPoints_ only present when unit is active
'11': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'pointId_'},
'2': {'type': 'int', 'name': 'status_'},
'3': {'type': 'int', 'name': 'pointType_'},
'4': {'type': 'float', 'name': 'x_'},
'5': {'type': 'float', 'name': 'y_'},
'6': {'type': 'float', 'name': 'phi_'}
}, 'name': ''}, # navigationPoints_
}, 'name': 'navigationPoints_'},
# Mapped from List<RoomDataInfo> to List<VenusRoomSweepBean>
'12': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'id'}, # roomId_
'2': {'type': 'bytes', 'name': 'name'}, # roomName_
# '3': {'type': 'bytes', 'name': 'roomTypeId_'},
# '4': {'type': 'bytes', 'name': 'meterialId_'},
'5': {'type': 'int', 'name': 'clean_state'}, # cleanState_
'6': {'type': 'int', 'name': 'room_clean'}, # roomClean_
# '7': {'type': 'int', 'name': 'roomCleanIndex_'},
'1': {'type': 'int', 'name': 'roomId_'},
'2': {'type': 'bytes', 'name': 'roomName_'},
'3': {'type': 'int', 'name': 'roomTypeId_'},
'4': {'type': 'int', 'name': 'meterialId_'},
'5': {'type': 'int', 'name': 'cleanState_'},
'6': {'type': 'int', 'name': 'roomClean_'},
'7': {'type': 'int', 'name': 'roomCleanIndex_'},
# Mapped from List<DevicePointInfo> to List<VenusDeviceAreaBean.RoomPoint>
'8': {'type': 'message', 'message_typedef': {
'1': {'type': 'float', 'name': 'x_'},
'2': {'type': 'float', 'name': 'y_'}
}, 'name': 'roomNamePost_'}
}, 'name': ''}, # error when using roomDataInfo_
}, 'name': 'roomNamePost_'},
# Mapped from CleanPerferenceDataInfo to VenusCleanPreferenceBean
'9': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'cleanMode_'},
'2': {'type': 'int', 'name': 'waterLevel_'},
'3': {'type': 'int', 'name': 'windPower_'},
'4': {'type': 'int', 'name': 'twiceClean_'},
}, 'name': 'cleanPerfer_'}
}, 'name': ''}, # roomDataInfo_
# Mapped from DeviceRoomMatrix to VenusRoomMatrixBean
'13': {'type': 'message', 'message_typedef': {
'1': {'type': 'bytes', 'name': 'matrix_'}
}, 'name': 'roomMatrix_'},
# Mapped from List<DeviceRoomChainDataInfo> to List<VenusRoomChainBean>
'14': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'room'},
'1': {'type': 'int', 'name': 'roomId_'},
# Mapped from List<DeviceChainPointDataInfo> to List<VenusRoomChainBean.RoomChainPoint>
'2': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'x_'},
'2': {'type': 'int', 'name': 'y_'},
'3': {'type': 'int', 'name': 'value_'}
}, 'name': ''} # error when using points_
}, 'name': ''} # error when using roomChain_
# 15: objects
# 16: furnitureInfo
# 17: houseInfos
# 18: backupAreas
}, 'name': 'points_'}
}, 'name': 'roomChain_'},
# Mapped from List<ObjectDataInfo> to List<VenusObjectIdentifyBean>
'15': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'objectId_'},
'2': {'type': 'int', 'name': 'objectTypeId_'},
'3': {'type': 'bytes', 'name': 'objectName_'},
'4': {'type': 'int', 'name': 'confirm_'},
'5': {'type': 'float', 'name': 'x_'},
'6': {'type': 'float', 'name': 'y_'},
'7': {'type': 'bytes', 'name': 'url_'},
}, 'name': 'objects_'},
# Mapped from List<FurnitureDataInfo> to List<VenusFurnitureBean>
'16': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'id_'},
'2': {'type': 'int', 'name': 'typeId_'},
# Mapped from List<DevicePointInfo> to List<VenusDeviceAreaBean.RoomPoint>
'3': {'type': 'message', 'message_typedef': {
'1': {'type': 'float', 'name': 'x_'},
'2': {'type': 'float', 'name': 'y_'}
}, 'name': 'points_'},
'4': {'type': 'bytes', 'name': 'url_'},
'5': {'type': 'int', 'name': 'status_'},
}, 'name': 'furnitureInfo_'},
# Mapped from List<HouseInfo> to List<VenusHouseBean>
'17': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'id_'},
'2': {'type': 'bytes', 'name': 'name_'},
'3': {'type': 'int', 'name': 'curMapCount_'},
'4': {'type': 'int', 'name': 'maxMapSize_'},
# Mapped from List<AllMapInfo> to List<VenusMapIdAndNameBean>
'5': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'mapHeadId_'},
'2': {'type': 'bytes', 'name': 'mapName_'}
}, 'name': 'maps_'},
}, 'name': 'houseInfos_'},
# Mapped from List<DeviceAreaDataInfo> to List<VenusDeviceAreaBean>
'18': {'type': 'message', 'message_typedef': {
'1': {'type': 'int', 'name': 'status_'},
'2': {'type': 'int', 'name': 'type_'},
'3': {'type': 'int', 'name': 'areaIndex_'},
# Mapped from List<DevicePointInfo> to List<VenusDeviceAreaBean.RoomPoint>
'4': {'type': 'message', 'message_typedef': {
'1': {'type': 'float', 'name': 'x_'},
'2': {'type': 'float', 'name': 'y_'}
}, 'name': 'points_'},
}, 'name': 'backupAreas_'},
}

@property
Expand Down Expand Up @@ -363,9 +464,9 @@ def rooms(self) -> Optional[Sequence[VacuumMapRoom]]:
@property
def navigation_points(self) -> Optional[Sequence[VacuumMapNavigationPoint]]:
map_data = self.parse_blob(blob=self._blob)
if 'historyPose_' in map_data:
if 'historyPose_' in map_data and 'points' in map_data['historyPose_']:
return [VacuumMapNavigationPoint(**points) for points in map_data['historyPose_']['points']]
if '6' in map_data:
if '6' in map_data and '2' in map_data['6']:
return [VacuumMapNavigationPoint(**points) for points in map_data['6']['2']]

def parse_blob(self, blob: str) -> dict:
Expand Down Expand Up @@ -394,16 +495,84 @@ def parse_blob(self, blob: str) -> dict:

map = json.loads(map)
for key, value in map.items():
self._logger.debug(f"key: {key}")
self._logger.debug(f" type: {value.__class__}")
self._logger.info(f"key: {key}")
self._logger.info(f" type: {value.__class__}")
if isinstance(value, (list, dict)):
self._logger.debug(f" count: {len(value)}")
self._logger.info(f" count: {len(value)}")

return map
except (binascii.Error, zlib.error) as e:
raise WyzeObjectFormationError(f"encountered an error parsing map blob {e}")


class VacuumMapSummary(JsonObject):
"""
A vacuum map summary.
"""

@property
def attributes(self) -> Set[str]:
return {
"current_map",
"img_url",
"latest_area_point_list",
"map_id",
"room_info_list",
"user_map_name",
}

def __init__(
self,
*,
current_map: bool = False,
img_url: str = None,
map_id: int = None,
user_map_name: str = None,
**others: dict
):
self._current_map = current_map if current_map else self._extract_attribute('current_map', others)
self._img_url = img_url if img_url else self._extract_attribute('img_url', others)
self._map_id = map_id if map_id else self._extract_attribute('map_id', others)
self._user_map_name = user_map_name if user_map_name else self._extract_attribute('user_map_name', others)
self._room_info_list = None
self._latest_area_point_list = None
latest_area_point_list = self._extract_attribute('latest_area_point_list', others)
if latest_area_point_list:
if not isinstance(latest_area_point_list, (list, Tuple)):
latest_area_point_list = [latest_area_point_list]
self._latest_area_point_list = [VacuumMapPoint(x=point['point_x'], y=point['point_y']) for point in latest_area_point_list]
room_info_list = self._extract_attribute('room_info_list', others)
if room_info_list:
if not isinstance(room_info_list, (list, Tuple)):
room_info_list = [room_info_list]
self._room_info_list = [VacuumMapRoom(id=room['room_id'], name=room['room_name']) for room in room_info_list]
show_unknown_key_warning(self, others)

@property
def is_current(self) -> bool:
return False if self._current_map is None else self._current_map

@property
def img_url(self) -> str:
return self._img_url

@property
def id(self) -> int:
return self._map_id

@property
def name(self) -> int:
return self._user_map_name

@property
def rooms(self) -> Optional[Sequence[VacuumMapRoom]]:
return None if not self._room_info_list else self._room_info_list

@property
def latest_points(self) -> Optional[Sequence[VacuumMapPoint]]:
return None if not self._latest_area_point_list else self._latest_area_point_list


class VacuumSweepRecord(JsonObject):
"""
A vacuum sweep record.
Expand Down
Loading

0 comments on commit 233d043

Please sign in to comment.