Skip to content

Commit

Permalink
add info to devicesettings, comments, exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
zweckj committed Nov 14, 2024
1 parent 1be6d3d commit 6edb18c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 14 deletions.
21 changes: 18 additions & 3 deletions aioacaia/acaiascale.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
NOTIFY_CHAR_ID,
OLD_STYLE_CHAR_ID,
)
from .exceptions import AcaiaDeviceNotFound, AcaiaError, AcaiaMessageTooShort
from .exceptions import (
AcaiaDeviceNotFound,
AcaiaError,
AcaiaMessageError,
AcaiaMessageTooLong,
AcaiaMessageTooShort,
)
from .const import UnitMass
from .decode import Message, Settings, decode
from .helpers import encode, encode_id, encode_notification_request, derive_model_name
Expand All @@ -35,6 +41,8 @@ class AcaiaDeviceState:

battery_level: int
units: UnitMass
beeps: bool = True
auto_off_time: int = 0


class AcaiaScale:
Expand Down Expand Up @@ -405,17 +413,24 @@ async def on_bluetooth_data_received(
_LOGGER.debug("Restored message from previous data: %s", data)

try:
msg = decode(data)[0]
msg, _ = decode(data)
except AcaiaMessageTooShort as ex:
if ex.bytes_recvd[0] != HEADER1 or ex.bytes_recvd[1] != HEADER2:
_LOGGER.debug("Non-header message too short: %s", ex.bytes_recvd)
else:
self._last_short_msg = ex.bytes_recvd
return
except AcaiaMessageTooLong as ex:
_LOGGER.debug("%s: %s", ex.message, ex.bytes_recvd)
except AcaiaMessageError as ex:
_LOGGER.warning("%s: %s", ex.message, ex.bytes_recvd)

if isinstance(msg, Settings):
self._device_state = AcaiaDeviceState(
battery_level=msg.battery, units=UnitMass(msg.units)
battery_level=msg.battery,
units=UnitMass(msg.units),
beeps=msg.beep_on,
auto_off_time=msg.auto_off,
)
_LOGGER.debug(
"Got battery level %s, units %s", str(msg.battery), str(msg.units)
Expand Down
17 changes: 13 additions & 4 deletions aioacaia/decode.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""Message decoding functions, taken from pyacaia."""

import logging
from dataclasses import dataclass

from bleak import BleakGATTCharacteristic

from .const import HEADER1, HEADER2
from .exceptions import AcaiaMessageTooShort
from .exceptions import AcaiaMessageError, AcaiaMessageTooLong, AcaiaMessageTooShort

_LOGGER = logging.getLogger(__name__)


@dataclass
class Message:
"""Representation of a message from the scale."""

Expand All @@ -26,9 +28,12 @@ def __init__(self, msg_type: int, payload: bytearray | list[int]) -> None:
str(msg_type),
payload,
)

# weight message
if self.msg_type == 5:
self.value = self._decode_weight(payload)

# heartbeat response
elif self.msg_type == 11:
if payload[2] == 5:
self.value = self._decode_weight(payload[3:])
Expand All @@ -38,10 +43,12 @@ def __init__(self, msg_type: int, payload: bytearray | list[int]) -> None:
"heartbeat response (weight: %s, time: %s)", self.value, self.time
)

# time message
elif self.msg_type == 7:
self.time = self._decode_time(payload)
_LOGGER.debug("timer: %s", self.time)

# button message
elif self.msg_type == 8:
if payload[0] == 0 and payload[1] == 5:
self.button = "tare"
Expand Down Expand Up @@ -71,7 +78,7 @@ def __init__(self, msg_type: int, payload: bytearray | list[int]) -> None:
_LOGGER.debug("unknownbutton %s", str(payload))

else:
_LOGGER.debug("message: %s, payload %s", msg_type, payload)
raise AcaiaMessageError(bytearray(payload), "Unknown message type")

def _decode_weight(self, weight_payload):
value = ((weight_payload[1] & 0xFF) << 8) + (weight_payload[0] & 0xFF)
Expand All @@ -98,6 +105,7 @@ def _decode_time(self, time_payload):
return value


@dataclass
class Settings:
"""Representation of the settings from the scale."""

Expand Down Expand Up @@ -166,8 +174,9 @@ def decode(byte_msg: bytearray):
msg_end = msg_start + byte_msg[msg_start + 3] + 5

if msg_end > len(byte_msg):
_LOGGER.debug("Message too long %s", byte_msg)
return (None, byte_msg)
if byte_msg[i] != HEADER1 or byte_msg[1] != HEADER2:
raise AcaiaMessageError(byte_msg, "Long message without headers")
raise AcaiaMessageTooLong(byte_msg)

if msg_start > 0:
_LOGGER.debug("Ignoring %s bytes before header", i)
Expand Down
25 changes: 19 additions & 6 deletions aioacaia/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,24 @@ class AcaiaUnknownDevice(Exception):
"""Exception for unknown devices."""


class AcaiaMessageTooShort(Exception):
"""Exception for messages that are too short."""
class AcaiaMessageError(Exception):
"""Exception for message errors."""

def __init__(
self, bytes_recvd: bytearray, message: str = "Message too short"
) -> None:
super().__init__(message)
def __init__(self, bytes_recvd: bytearray, message: str) -> None:
super().__init__()
self.message = message
self.bytes_recvd = bytes_recvd


class AcaiaMessageTooShort(AcaiaMessageError):
"""Exception for messages that are too short."""

def __init__(self, bytes_recvd: bytearray) -> None:
super().__init__(bytes_recvd, "Message too short")


class AcaiaMessageTooLong(AcaiaMessageError):
"""Exception for messages that are too long."""

def __init__(self, bytes_recvd: bytearray) -> None:
super().__init__(bytes_recvd, "Message too long")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="aioacaia",
version="0.1.5",
version="0.1.6",
description="An async implementation of PyAcaia",
long_description=readme,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 6edb18c

Please sign in to comment.