Skip to content

Commit

Permalink
typing
Browse files Browse the repository at this point in the history
  • Loading branch information
zweckj committed Nov 2, 2023
1 parent bd50eea commit bb68eb6
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 287 deletions.
223 changes: 116 additions & 107 deletions pyacaia_async/acaiascale.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,119 @@
"""Client to interact with Acaia scales."""
from __future__ import annotations

import asyncio
import logging
import time
from typing import Awaitable, Callable

from bleak import BleakClient, BleakGATTCharacteristic, BLEDevice
from bleak.exc import BleakDeviceNotFoundError, BleakError

from bleak import BleakClient, BLEDevice
from bleak.exc import BleakError, BleakDeviceNotFoundError
from .const import (
DEFAULT_CHAR_ID,
HEARTBEAT_INTERVAL,
NOTIFY_CHAR_ID,
OLD_STYLE_CHAR_ID,
)
from .helpers import encode, encodeId, encodeNotificationRequest
from .exceptions import AcaiaDeviceNotFound, AcaiaError
from .helpers import encode, encode_id, encode_notification_request

_LOGGER = logging.getLogger(__name__)

class AcaiaScale():

def __init__(self, mac: str = None, is_new_style_scale: bool=True):
class AcaiaScale:
"""Representation of an Acaia scale."""

_default_char_id = DEFAULT_CHAR_ID
_notify_char_id = NOTIFY_CHAR_ID
_msg_types = {
"tare": encode(4, [0]),
"startTimer": encode(13, [0, 0]),
"stopTimer": encode(13, [0, 2]),
"resetTimer": encode(13, [0, 1]),
"heartbeat": encode(0, [2, 0]),
"getSettings": encode(6, [0] * 16),
"notificationRequest": encode_notification_request(),
}

def __init__(self, mac: str | None = None, is_new_style_scale: bool = True) -> None:
"""Initialize the scale."""

self._mac = mac
self._is_new_style_scale = is_new_style_scale

self._client = None
self._client: BleakClient | None = None
self._connected = False
self._disconnecting = False
self._timestamp_last_command = None
self._timer_running = False
self._timer_start = None
self._timer_stop = None
self._timestamp_last_command: float | None = None
self._timer_start: float | None = None
self._timer_stop: float | None = None

self._queue = asyncio.Queue()
self._queue: asyncio.Queue = asyncio.Queue()

self._msg_types = {
"tare": encode(4, [0]),
"startTimer": encode(13, [0,0]),
"stopTimer": encode(13, [0,2]),
"resetTimer": encode(13, [0,1]),
"heartbeat": encode(0, [2,0]),
"getSettings": encode(6, [0]*16),
"auth": encodeId(isPyxisStyle=is_new_style_scale),
"notificationRequest": encodeNotificationRequest(),
}
self._msg_types["auth"] = encode_id(is_pyxis_style=is_new_style_scale)

if not is_new_style_scale:
# for old style scales, the default char id is the same as the notify char id
DEFAULT_CHAR_ID = NOTIFY_CHAR_ID = OLD_STYLE_CHAR_ID
self._default_char_id = self._notify_char_id = OLD_STYLE_CHAR_ID

@classmethod
async def create(cls, mac: str = None, bleDevice: BLEDevice = None, is_new_style_scale: bool=True, callback = None) -> AcaiaScale:
"""Create a new scale."""
async def create(
cls,
mac: str | None = None,
ble_device: BLEDevice | None = None,
is_new_style_scale: bool = True,
callback: Callable[[BleakGATTCharacteristic, bytearray], Awaitable[None] | None]
| None = None,
) -> AcaiaScale:
"""Create a new scale."""
self = cls(mac, is_new_style_scale)

if bleDevice:
self._client = BleakClient(bleDevice)
if ble_device:
self._client = BleakClient(ble_device)
elif mac:
self._client = BleakClient(mac)
else:
raise ValueError("Either mac or bleDevice must be specified")

await self.connect(callback)
asyncio.create_task(self._send_heartbeats(interval=HEARTBEAT_INTERVAL if not is_new_style_scale else 1, new_style_heartbeat=is_new_style_scale))
asyncio.create_task(
self._send_heartbeats(
interval=HEARTBEAT_INTERVAL if not is_new_style_scale else 1,
new_style_heartbeat=is_new_style_scale,
)
)
asyncio.create_task(self._process_queue())
return self

@property
def msg_types(self) -> dict:
"""Return the message types."""
return self._msg_types

@property
def timer(self) -> int:
"""Return the current timer value in seconds."""
if self._timer_start is None:
return 0
if self._timer_running:
return int(time.time() - self._timer_start)
else:
return int(self._timer_stop - self._timer_start)

if self._timer_stop is None:
return 0

def new_client_from_ble_device(self, BLED: BLEDevice) -> None:
""" Create a new client from a BLEDevice, used for Home Assistant"""
self._client = BleakClient(BLED)
return int(self._timer_stop - self._timer_start)

def new_client_from_ble_device(self, ble_device: BLEDevice) -> None:
"""Create a new client from a BLEDevice, used for Home Assistant"""
self._client = BleakClient(ble_device)

async def _write_msg(self, char_id: str, payload: bytearray) -> None:
""" wrapper for writing to the device."""
"""wrapper for writing to the device."""
try:
if not self._connected:
return

assert self._client
await self._client.write_gatt_char(char_id, payload)
self._timestamp_last_command = time.time()
except BleakDeviceNotFoundError as ex:
Expand All @@ -100,10 +125,9 @@ async def _write_msg(self, char_id: str, payload: bytearray) -> None:
except Exception as ex:
self._connected = False
raise AcaiaError("Unknown error writing to device") from ex


async def _process_queue(self) -> None:
""" Task to process the queue in the background. """
"""Task to process the queue in the background."""
while True:
try:
if not self._connected:
Expand All @@ -112,31 +136,36 @@ async def _process_queue(self) -> None:
self._queue.get_nowait()
self._queue.task_done()
return

if self._disconnecting and self._queue.empty():
return

char_id, payload = await self._queue.get()
await self._write_msg(char_id, payload)
self._queue.task_done()
await asyncio.sleep(0.1)

except asyncio.CancelledError:
return
except Exception as ex:
except (AcaiaDeviceNotFound, AcaiaError) as ex:
_LOGGER.debug("Error writing to device: %s", ex)
return


async def connect(self, callback = None) -> None:
""" Initiate connection to the scale """
async def connect(
self,
callback: Callable[[BleakGATTCharacteristic, bytearray], Awaitable[None] | None]
| None = None,
) -> None:
"""Initiate connection to the scale"""
if not self._client:
raise AcaiaError("Client not initialized")
try:
await self._client.connect()
self._connected = True
_LOGGER.debug("Connected to Acaia Scale.")
_LOGGER.debug("Connected to Acaia Scale")

if callback is not None:
await self._client.start_notify(NOTIFY_CHAR_ID, callback)
await self._client.start_notify(self._notify_char_id, callback)
await asyncio.sleep(0.5)

await self.auth()
Expand All @@ -148,106 +177,86 @@ async def connect(self, callback = None) -> None:
raise AcaiaDeviceNotFound("Device not found") from ex
except BleakError as ex:
raise AcaiaError("Error connecting to device") from ex


async def auth(self) -> None:
""" Send auth message to scale, if subscribed to notifications returns Settings object """
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["auth"]
))
"""Send auth message to scale, if subscribed to notifications returns Settings object"""
await self._queue.put((self._default_char_id, self.msg_types["auth"]))

async def send_weight_notification_request(self) -> None:
""" Tell the scale to send weight notifications """

await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["notificationRequest"]
))
"""Tell the scale to send weight notifications"""

await self._queue.put(
(self._default_char_id, self.msg_types["notificationRequest"])
)

async def _send_heartbeats(self, interval:int=HEARTBEAT_INTERVAL, new_style_heartbeat:bool=False) -> None:
""" Task to send heartbeats in the background. """
async def _send_heartbeats(
self, interval: int = HEARTBEAT_INTERVAL, new_style_heartbeat: bool = False
) -> None:
"""Task to send heartbeats in the background."""
while True:
try:
if not self._connected or self._disconnecting:
return
_LOGGER.debug("Sending heartbeat.")

_LOGGER.debug("Sending heartbeat")
if new_style_heartbeat:
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["auth"]
))

await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["heartbeat"]
))

await self._queue.put(
(self._default_char_id, self.msg_types["auth"])
)

await self._queue.put(
(self._default_char_id, self.msg_types["heartbeat"])
)

if new_style_heartbeat:
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["getSettings"]
))
await self._queue.put(
(self._default_char_id, self.msg_types["getSettings"])
)
await asyncio.sleep(interval)
except asyncio.CancelledError:
return
except Exception as ex:
except asyncio.QueueFull as ex:
_LOGGER.debug("Error sending heartbeat: %s", ex)
return

async def disconnect(self) -> None:
""" Clean disconnect from the scale """
"""Clean disconnect from the scale"""
if not self._client:
return
try:
_LOGGER.debug("Disconnecting from scale.")
_LOGGER.debug("Disconnecting from scale")
self._disconnecting = True
await self._queue.join()
await self._client.disconnect()
self._connected = False
_LOGGER.debug("Disconnected from Acaia Scale.")
except Exception as ex:
_LOGGER.debug("Disconnected from Acaia Scale")
except BleakError as ex:
_LOGGER.debug("Error disconnecting from device: %s", ex)


async def tare(self) -> None:
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["tare"]
))
"""Tare the scale."""
await self._queue.put((self._default_char_id, self.msg_types["tare"]))


async def startStopTimer(self) -> None:
async def start_stop_timer(self) -> None:
"""Start/Stop the timer."""
if not self._timer_running:
_LOGGER.debug('Sending "start" message.')
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["startTimer"]
))
await self._queue.put((self._default_char_id, self.msg_types["startTimer"]))
self._timer_running = True
if not self._timer_start:
self._timer_start = time.time()
else:
_LOGGER.debug('Sending "stop" message.')
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["stopTimer"]
))
await self._queue.put((self._default_char_id, self.msg_types["stopTimer"]))
self._timer_running = False
self._timer_stop = time.time()


async def resetTimer(self) -> None:
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["resetTimer"]
))
async def reset_timer(self) -> None:
"""Reset the timer."""
await self._queue.put((self._default_char_id, self.msg_types["resetTimer"]))
self._timer_start = None
self._timer_stop = None

if self._timer_running:
await self._queue.put((
DEFAULT_CHAR_ID,
self.msg_types["startTimer"]
))
await self._queue.put((self._default_char_id, self.msg_types["startTimer"]))
self._timer_start = time.time()
16 changes: 10 additions & 6 deletions pyacaia_async/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
DEFAULT_CHAR_ID = "49535343-8841-43f4-a8d4-ecbe34729bb3"
NOTIFY_CHAR_ID = "49535343-1e4d-4bd9-ba61-23c647249616"
OLD_STYLE_CHAR_ID = "00002a80-0000-1000-8000-00805f9b34fb"
HEADER1 = 0xef
HEADER2 = 0xdd
HEARTBEAT_INTERVAL = 5
"""Constants for pyacaia_async."""
from typing import Final

DEFAULT_CHAR_ID: Final = "49535343-8841-43f4-a8d4-ecbe34729bb3"
NOTIFY_CHAR_ID: Final = "49535343-1e4d-4bd9-ba61-23c647249616"
OLD_STYLE_CHAR_ID: Final = "00002a80-0000-1000-8000-00805f9b34fb"
HEADER1: Final = 0xEF
HEADER2: Final = 0xDD
HEARTBEAT_INTERVAL: Final = 5
SCALE_START_NAMES: Final = ["ACAIA", "PYXIS", "LUNAR", "PROCH"]
Loading

0 comments on commit bb68eb6

Please sign in to comment.