diff --git a/pyacaia_async/acaiascale.py b/pyacaia_async/acaiascale.py index 996fd0d..4bcc7fe 100644 --- a/pyacaia_async/acaiascale.py +++ b/pyacaia_async/acaiascale.py @@ -1,4 +1,5 @@ """Client to interact with Acaia scales.""" + from __future__ import annotations import asyncio @@ -6,7 +7,8 @@ import time from collections.abc import Awaitable, Callable -from typing import Any + +from dataclasses import dataclass from bleak import BleakClient, BleakGATTCharacteristic, BLEDevice from bleak.exc import BleakDeviceNotFoundError, BleakError @@ -18,18 +20,28 @@ OLD_STYLE_CHAR_ID, ) from .exceptions import AcaiaDeviceNotFound, AcaiaError -from .const import BATTERY_LEVEL, GRAMS, WEIGHT, UNITS +from .const import UnitMass from .decode import Message, Settings, decode from .helpers import encode, encode_id, encode_notification_request _LOGGER = logging.getLogger(__name__) +@dataclass(kw_only=True) +class AcaiaData: + """Data class for Acaia scale data.""" + + battery_level: int + weight: float + units: UnitMass + + class AcaiaScale: """Representation of an Acaia scale.""" _default_char_id = DEFAULT_CHAR_ID _notify_char_id = NOTIFY_CHAR_ID + _msg_types = { "tare": encode(4, [0]), "startTimer": encode(13, [0, 0]), @@ -42,28 +54,30 @@ class AcaiaScale: def __init__( self, - mac: str | None = None, + mac: str, is_new_style_scale: bool = True, notify_callback: Callable[[], None] | None = None, ) -> None: """Initialize the scale.""" - self._mac = mac self._is_new_style_scale = is_new_style_scale - self._client: BleakClient | None = None - self._connected = False - self._disconnecting = False - self._last_disconnect_time: float | None = None - self._timer_running = False + self._client = BleakClient( + address_or_ble_device=mac, + disconnected_callback=self._device_disconnected_callback, + ) + self.connected = False + self.timer_running = False + self.heartbeat_task: asyncio.Task | None = None + self.process_queue_task: asyncio.Task | None = None + self.last_disconnect_time: float | None = None + self._timestamp_last_command: float | None = None self._timer_start: float | None = None self._timer_stop: float | None = None - self._data: dict[str, Any] = {BATTERY_LEVEL: None, UNITS: GRAMS, WEIGHT: 0.0} + self._data = AcaiaData(battery_level=0, weight=0.0, units=UnitMass.GRAMS) self._queue: asyncio.Queue = asyncio.Queue() - self._heartbeat_task: asyncio.Task | None = None - self._process_queue_task: asyncio.Task | None = None self._msg_types["auth"] = encode_id(is_pyxis_style=is_new_style_scale) @@ -76,21 +90,10 @@ def __init__( @property def mac(self) -> str: """Return the mac address of the scale in upper case.""" - assert self._mac - return self._mac.upper() - - @property - def timer_running(self) -> bool: - """Return whether the timer is running.""" - return self._timer_running - - @property - def connected(self) -> bool: - """Return whether the scale is connected.""" - return self._connected + return self._client.address.upper() @property - def data(self) -> dict[str, Any]: + def data(self) -> AcaiaData: """Return the data of the scale.""" return self._data @@ -100,39 +103,33 @@ async def create( mac: str | None = None, ble_device: BLEDevice | None = None, is_new_style_scale: bool = True, - callback: Callable[[BleakGATTCharacteristic, bytearray], Awaitable[None] | None] - | None = None, + callback: ( + Callable[[BleakGATTCharacteristic, bytearray], Awaitable[None] | None] + | None + ) = None, ) -> AcaiaScale: """Create a new scale.""" - self = cls(mac, is_new_style_scale) if ble_device: + self = cls("", is_new_style_scale) self._client = BleakClient( address_or_ble_device=ble_device, disconnected_callback=self._device_disconnected_callback, ) elif mac: - self._client = BleakClient( - address_or_ble_device=mac, - disconnected_callback=self._device_disconnected_callback, - ) + self = cls(mac, is_new_style_scale) else: raise ValueError("Either mac or bleDevice must be specified") await self.connect(callback) return self - @property - def msg_types(self) -> dict: - """Return the message types.""" - return self._msg_types - @property def timer(self) -> int: """Return the current timer value in seconds.""" if self._timer_start is None: return 0 - if self._timer_running: + if self.timer_running: return int(time.time() - self._timer_start) if self._timer_stop is None: return 0 @@ -141,9 +138,13 @@ def timer(self) -> int: def _device_disconnected_callback(self, client: BleakClient) -> None: """Callback for device disconnected.""" - _LOGGER.warning("Scale with MAC %s disconnected", self.mac) - self._connected = False - self._last_disconnect_time = time.time() + + _LOGGER.debug( + "Scale with address %s disconnected through disconnect callback", + client.address, + ) + self.connected = False + self.last_disconnect_time = time.time() if self._notify_callback: self._notify_callback() @@ -157,34 +158,43 @@ def new_client_from_ble_device(self, ble_device: BLEDevice) -> None: async def _write_msg(self, char_id: str, payload: bytearray) -> None: """wrapper for writing to the device.""" try: - if not self._connected: + if not self.connected: return - assert self._client - _LOGGER.debug("Writing to scale %s", self.mac) + await self._client.write_gatt_char(char_id, payload) self._timestamp_last_command = time.time() - except (BleakDeviceNotFoundError, BleakError, TimeoutError) as ex: - if ( - self._connected - ): # might have been disconnected by the disconnected_callback in the meantime - self._connected = False - self._last_disconnect_time = time.time() - if isinstance(ex, BleakDeviceNotFoundError): - raise AcaiaDeviceNotFound("Device not found") from ex - raise AcaiaError("Error writing to device") from ex - - async def _process_queue(self) -> None: + except BleakDeviceNotFoundError as ex: + self.connected = False + raise AcaiaDeviceNotFound("Device not found") from ex + except BleakError as ex: + self.connected = False + raise AcaiaError("Error writing to device") from ex + except TimeoutError as ex: + self.connected = False + raise AcaiaError("Timeout writing to device") from ex + except Exception as ex: + self.connected = False + raise AcaiaError("Unknown error writing to device") from ex + + def async_empty_queue_and_cancel_tasks(self) -> None: + """Empty the queue.""" + + while not self._queue.empty(): + self._queue.get_nowait() + self._queue.task_done() + + if self.heartbeat_task and not self.heartbeat_task.done(): + self.heartbeat_task.cancel() + + if self.process_queue_task and not self.process_queue_task.done(): + self.process_queue_task.cancel() + + async def process_queue(self) -> None: """Task to process the queue in the background.""" while True: try: - if not self._connected: - while not self._queue.empty(): - # empty the queue - self._queue.get_nowait() - self._queue.task_done() - return - - if self._disconnecting and self._queue.empty(): + if not self.connected: + self.async_empty_queue_and_cancel_tasks() return char_id, payload = await self._queue.get() @@ -193,160 +203,177 @@ async def _process_queue(self) -> None: await asyncio.sleep(0.1) except asyncio.CancelledError: + self.connected = False return except (AcaiaDeviceNotFound, AcaiaError) as ex: - if ( - self.connected - ): # might have been disconnected by the disconnected_callback in the meantime - _LOGGER.warning("Error writing to device: %s", ex) + self.connected = False + _LOGGER.debug("Error writing to device: %s", ex) return async def connect( self, - callback: Callable[[BleakGATTCharacteristic, bytearray], Awaitable[None] | None] - | None = None, + callback: ( + Callable[[BleakGATTCharacteristic, bytearray], Awaitable[None] | None] + | None + ) = None, + setup_tasks: bool = True, ) -> None: - """Initiate connection to the scale""" - if not self._client: - raise AcaiaError("Client not initialized") + """Connect the bluetooth client.""" + if self.connected: return - if self._last_disconnect_time and self._last_disconnect_time > ( - time.time() - 15 - ): + + if self.last_disconnect_time and self.last_disconnect_time > (time.time() - 15): _LOGGER.debug( - "Scale has recently been disconnected, waiting 15 seconds before reconnecting", - self.mac, + "Scale has recently been disconnected, waiting 15 seconds before reconnecting" ) return try: - try: - await self._client.connect() - except (BleakError, TimeoutError) as ex: - _LOGGER.debug("Error during connecting to device: %s", ex) - raise AcaiaError("Error during connecting to device") from ex - - self._connected = True - _LOGGER.debug("Connected to Acaia Scale") - - if callback is None: - callback = self.on_bluetooth_data_received - - try: - await self._client.start_notify(self._notify_char_id, callback) - await asyncio.sleep(0.5) - except BleakError as ex: - _LOGGER.debug("Error subscribing to notifications: %s", ex) - raise AcaiaError("Error subscribing to notifications") from ex + await self._client.connect() + except BleakError as ex: + msg = "Error during connecting to device" + _LOGGER.debug("%s: %s", msg, ex) + raise AcaiaError(msg) from ex + except TimeoutError as ex: + msg = "Timeout during connecting to device" + _LOGGER.debug("%s: %s", msg, ex) + raise AcaiaError(msg) from ex + except Exception as ex: + msg = "Unknown error during connecting to device" + _LOGGER.debug("%s: %s", msg, ex) + raise AcaiaError(msg) from ex + + self.connected = True + _LOGGER.debug("Connected to Acaia scale") + + if callback is None: + callback = self.on_bluetooth_data_received + try: + await self._client.start_notify( + char_specifier=self._notify_char_id, + callback=( + self.on_bluetooth_data_received if callback is None else callback + ), + ) + await asyncio.sleep(0.5) + except BleakError as ex: + msg = "Error subscribing to notifications" + _LOGGER.debug("%s: %s", msg, ex) + raise AcaiaError(msg) from ex + try: await self.auth() - await self.send_weight_notification_request() - + if callback is not None: + await self.send_weight_notification_request() except BleakDeviceNotFoundError as ex: raise AcaiaDeviceNotFound("Device not found") from ex + except BleakError as ex: + raise AcaiaError("Error during authentication") from ex - self._setup_tasks() + if setup_tasks: + self._setup_tasks() def _setup_tasks(self) -> None: """Setup background tasks""" - if not self._heartbeat_task or self._heartbeat_task.done(): - self._heartbeat_task = asyncio.create_task( - self._send_heartbeats( - interval=HEARTBEAT_INTERVAL if not self._is_new_style_scale else 1, - new_style_heartbeat=self._is_new_style_scale, - ) - ) - if not self._process_queue_task or self._process_queue_task.done(): - self._process_queue_task = asyncio.create_task(self._process_queue()) + if not self.heartbeat_task or self.heartbeat_task.done(): + self.heartbeat_task = asyncio.create_task(self.send_heartbeats()) + if not self.process_queue_task or self.process_queue_task.done(): + self.process_queue_task = asyncio.create_task(self.process_queue()) async def auth(self) -> None: """Send auth message to scale, if subscribed to notifications returns Settings object""" - await self._queue.put((self._default_char_id, self.msg_types["auth"])) + await self._queue.put((self._default_char_id, self._msg_types["auth"])) async def send_weight_notification_request(self) -> None: """Tell the scale to send weight notifications""" await self._queue.put( - (self._default_char_id, self.msg_types["notificationRequest"]) + (self._default_char_id, self._msg_types["notificationRequest"]) ) - async def _send_heartbeats( - self, interval: int = HEARTBEAT_INTERVAL, new_style_heartbeat: bool = False - ) -> None: + async def send_heartbeats(self) -> None: """Task to send heartbeats in the background.""" while True: try: - if not self._connected or self._disconnecting: + if not self.connected: return _LOGGER.debug("Sending heartbeat") - if new_style_heartbeat: + if self._is_new_style_scale: await self._queue.put( - (self._default_char_id, self.msg_types["auth"]) + (self._default_char_id, self._msg_types["auth"]) ) await self._queue.put( - (self._default_char_id, self.msg_types["heartbeat"]) + (self._default_char_id, self._msg_types["heartbeat"]) ) - if new_style_heartbeat: + if self._is_new_style_scale: await self._queue.put( - (self._default_char_id, self.msg_types["getSettings"]) + (self._default_char_id, self._msg_types["getSettings"]) ) - await asyncio.sleep(interval) + await asyncio.sleep( + HEARTBEAT_INTERVAL if not self._is_new_style_scale else 1, + ) except asyncio.CancelledError: + self.connected = False return except asyncio.QueueFull as ex: + self.connected = False _LOGGER.debug("Error sending heartbeat: %s", ex) return async def disconnect(self) -> None: """Clean disconnect from the scale""" - if not self._client: - return + + _LOGGER.debug("Disconnecting from scale") + self.connected = False + await self._queue.join() try: - _LOGGER.debug("Disconnecting from scale") - self._disconnecting = True - await self._queue.join() await self._client.disconnect() - self._connected = False - _LOGGER.debug("Disconnected from Acaia Scale") except BleakError as ex: _LOGGER.debug("Error disconnecting from device: %s", ex) + else: + _LOGGER.debug("Disconnected from scale") async def tare(self) -> None: """Tare the scale.""" if not self.connected: await self.connect() - await self._queue.put((self._default_char_id, self.msg_types["tare"])) + await self._queue.put((self._default_char_id, self._msg_types["tare"])) async def start_stop_timer(self) -> None: """Start/Stop the timer.""" if not self.connected: await self.connect() - if not self._timer_running: + + if not self.timer_running: _LOGGER.debug('Sending "start" message.') - await self._queue.put((self._default_char_id, self.msg_types["startTimer"])) - self._timer_running = True + await self._queue.put( + (self._default_char_id, self._msg_types["startTimer"]) + ) + self.timer_running = True if not self._timer_start: self._timer_start = time.time() else: _LOGGER.debug('Sending "stop" message.') - await self._queue.put((self._default_char_id, self.msg_types["stopTimer"])) - self._timer_running = False + await self._queue.put((self._default_char_id, self._msg_types["stopTimer"])) + self.timer_running = False self._timer_stop = time.time() async def reset_timer(self) -> None: """Reset the timer.""" if not self.connected: await self.connect() - await self._queue.put((self._default_char_id, self.msg_types["resetTimer"])) + await self._queue.put((self._default_char_id, self._msg_types["resetTimer"])) self._timer_start = None self._timer_stop = None - if self._timer_running: - await self._queue.put((self._default_char_id, self.msg_types["startTimer"])) + if self.timer_running: + await self._queue.put( + (self._default_char_id, self._msg_types["startTimer"]) + ) self._timer_start = time.time() async def on_bluetooth_data_received( @@ -356,16 +383,16 @@ async def on_bluetooth_data_received( msg = decode(data)[0] if isinstance(msg, Settings): - self._data[BATTERY_LEVEL] = msg.battery - self._data[UNITS] = msg.units + self._data.battery_level = msg.battery + self._data.units = UnitMass(msg.units) _LOGGER.debug( "Got battery level %s, units %s", str(msg.battery), str(msg.units) ) elif isinstance(msg, Message): - self._data[WEIGHT] = msg.value + self._data.weight = msg.value or 0 if msg.timer_running is not None: - self._timer_running = msg.timer_running + self.timer_running = msg.timer_running _LOGGER.debug("Got weight %s", str(msg.value)) if self._notify_callback is not None: diff --git a/pyacaia_async/const.py b/pyacaia_async/const.py index 765c8d4..5c15b49 100644 --- a/pyacaia_async/const.py +++ b/pyacaia_async/const.py @@ -1,5 +1,7 @@ """Constants for pyacaia_async.""" + from typing import Final +from enum import StrEnum DEFAULT_CHAR_ID: Final = "49535343-8841-43f4-a8d4-ecbe34729bb3" NOTIFY_CHAR_ID: Final = "49535343-1e4d-4bd9-ba61-23c647249616" @@ -9,9 +11,9 @@ HEARTBEAT_INTERVAL: Final = 5 SCALE_START_NAMES: Final = ["ACAIA", "PYXIS", "LUNAR", "PROCH"] -BATTERY_LEVEL: Final = "battery_level" -WEIGHT: Final = "weight" -UNITS: Final = "units" -GRAMS: Final = "grams" -OUNCE: Final = "ounces" +class UnitMass(StrEnum): + """Unit of mass.""" + + GRAMS = "grams" + OUNCES = "ounces" diff --git a/setup.py b/setup.py index 07457d9..71e75e5 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="pyacaia_async", - version="0.0.11b12", + version="0.0.12b1", description="An async implementation of PyAcaia", long_description=readme, long_description_content_type="text/markdown",