Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new backend: Silicon Labs BGAPI #1194

Draft
wants to merge 18 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Added
* Added ``BleakScanner.find_device_by_name()`` class method.
* Added optional command line argument to use debug log level to all applicable examples.
* Make sure the disconnect monitor task is properly cancelled on the BlueZ client.
* Added new backend, for Silicon Labs NCP devices implementing BGAPI

Changed
-------
Expand Down
55 changes: 55 additions & 0 deletions bleak/assigned_numbers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,58 @@ class AdvertisementDataType(IntEnum):
SERVICE_DATA_UUID128 = 0x21

MANUFACTURER_SPECIFIC_DATA = 0xFF


AppearanceCategories = {
0x000: "Unknown",
0x001: "Phone",
0x002: "Computer",
0x003: "Watch",
0x004: "Clock",
0x005: "Display",
0x006: "Remote Control",
0x007: "Eye-glasses",
0x008: "Tag",
0x009: "Keyring",
0x00A: "Media Player",
0x00B: "Barcode Scanner",
0x00C: "Thermometer",
0x00D: "Heart Rate Sensor",
0x00E: "Blood Pressure",
0x00F: "Human Interface Device",
0x010: "Glucose Meter",
0x011: "Running Walking Sensor",
0x012: "Cycling",
0x013: "Control Device",
0x014: "Network Device",
0x015: "Sensor",
0x016: "Light Fixtures",
0x017: "Fan",
0x018: "HVAC",
0x019: "Air Conditioning",
0x01A: "Humidifier",
0x01B: "Heating",
0x01C: "Access Control",
0x01D: "Motorized Device",
0x01E: "Power Device",
0x01F: "Light Source",
0x020: "Window Covering",
0x021: "Audio Sink",
0x022: "Audio Source",
0x023: "Motorized Vehicle",
0x024: "Domestic Appliance",
0x025: "Wearable Audio Device",
0x026: "Aircraft",
0x027: "AV Equipment",
0x028: "Display Equipment",
0x029: "Hearing aid",
0x02A: "Gaming",
0x02B: "Signage",
0x031: "Pulse Oximeter",
0x032: "Weight Scale",
0x033: "Personal Mobility Device",
0x034: "Continuous Glucose Monitor",
0x035: "Insulin Pump",
0x036: "Medication Delivery",
0x051: "Outdoor Sports Activity",
}
129 changes: 129 additions & 0 deletions bleak/backends/bgapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
Backend targetting Silicon Labs devices running "NCP" firmware, using the BGAPI via pyBGAPI
See: https://pypi.org/project/pybgapi/
"""
import asyncio
import logging
import typing

import bgapi

class BgapiHandler():
def __init__(self, adapter, xapi):
self.log = logging.getLogger(f"BgapiHandler-{adapter}")
self.log.info("Creating an explicit handler")
self._loop = asyncio.get_running_loop()

self.lib = bgapi.BGLib(
bgapi.SerialConnector(adapter, baudrate=115200),
xapi,
self.bgapi_evt_handler,
)
self._scan_handlers: typing.List[typing.Callable] = list()
self._conn_handlers: typing.Dict[int, typing.Callable] = {}
self._is_scanning = False
self.scan_phy = None
self.scan_parameters = None
self.scan_discover_mode = None
# We should make _this_ layer do a .lib.open() straight away, so it can call reset...?
# then it can manage start_scan
self.lib.open()
self.is_booted = asyncio.Event()
self.lib.bt.system.reset(0)
# block other actions here til we get the booted message?

def bgapi_evt_handler(self, evt):
"""
THIS RUNS IN THE BGLIB THREAD!
and because of this, we can't call commands from here ourself, we'd have to
recall them back onto the other thread?
"""
if evt == "bt_evt_system_boot":
self.log.debug(
"NCP booted: %d.%d.%db%d hw:%d hash: %x",
evt.major,
evt.minor,
evt.patch,
evt.build,
evt.hw,
evt.hash,
)
# TODO - save boot info? any purpose?
self._loop.call_soon_threadsafe(self.is_booted.set)

#self.log.debug("Internal event received, sending to %d subs: %s", len(self._scan_handlers), evt)
if hasattr(evt, "connection"):
handler = self._conn_handlers.get(evt.connection, None)
if handler:
handler(evt)
else:
if evt.reason == 4118:
# disconnected at local request, and our client has gone away
# without waiting. waiting for this would be possible, but seems unnecessary
pass
else:
self.log.warning("Connection event with no matching handler!: %s", evt)
else:
# CAUTION: is this too restrictive? (only ending events without connection details)
for x in self._scan_handlers:
x(evt)

#self.log.debug("int event finished")

async def start_scan(self, phy, scanning_mode, discover_mode, handler: typing.Callable):
"""
:return:
"""
await self.is_booted.wait()
self._scan_handlers.append(handler)
if self._is_scanning:
# TODO - If params are the same, return, if params are different....
# reinitialize with new ones? we're still by definition one app,
# we must assume cooperative.
self.log.debug("scanning already in process, skipping")
return
self._is_scanning = True
self.log.debug("requesting bgapi to start scanning")
self.scan_phy = phy
self.scan_parameters = scanning_mode
self.scan_discover_mode = discover_mode
self.lib.bt.scanner.set_parameters(self.scan_parameters, 0x10, 0x10)
self.lib.bt.scanner.start(self.scan_phy, self.scan_discover_mode)

async def stop_scan(self, handler: typing.Callable):
self._scan_handlers.remove(handler)
if len(self._scan_handlers) == 0:
self.log.info("Stopping scanners, all listeners have exited")
self.lib.bt.scanner.stop()
self._is_scanning = False

async def connect(self, address, address_type, phy, handler: typing.Callable):
await self.is_booted.wait()
_, ch = self.lib.bt.connection.open(address, address_type, phy)
self._conn_handlers[ch] = handler
self.log.debug("Attempting connection to addr: %s, assigned ch: %d", address, ch)
return ch

async def disconnect(self, ch):
self.log.debug("attempting to disconnect ch: %d", ch)
self.lib.bt.connection.close(ch)
# the user won't get a final event, but they're exiting anyway, they don't care
self._conn_handlers.pop(ch)


class BgapiRegistry:
"""
Holds lib/connector instances based on the adapter address.
Only allows one, so you can have multiple higher level objects...
"""
registry = {}


@classmethod
def get(cls, adapter, xapi):
x = cls.registry.get(adapter)
if x:
return x
x = BgapiHandler(adapter, xapi)
cls.registry[adapter] = x
return x
94 changes: 94 additions & 0 deletions bleak/backends/bgapi/characteristic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import collections
from typing import List, Union
from uuid import UUID

from ..characteristic import BleakGATTCharacteristic, GattCharacteristicsFlags
from ..descriptor import BleakGATTDescriptor

PartialCharacteristic = collections.namedtuple(
"PartialCharacteristic", ["uuid", "handle", "properties"]
)


class BleakGATTCharacteristicBGAPI(BleakGATTCharacteristic):
"""GATT Characteristic implementation for the Silicon Labs BGAPI backend"""

def __init__(
self,
obj: PartialCharacteristic,
service_uuid: str,
service_handle: int,
max_write_without_response_size: int,
):
super(BleakGATTCharacteristicBGAPI, self).__init__(
obj, max_write_without_response_size
)
self.__uuid = self.obj.uuid
self.__handle = self.obj.handle
self.__service_uuid = service_uuid
self.__service_handle = service_handle
self.__descriptors = []
self.__notification_descriptor = None

self.__properties = [
x.name for x in GattCharacteristicsFlags if x.value & obj.properties > 0
]

@property
def service_uuid(self) -> str:
"""The uuid of the Service containing this characteristic"""
return self.__service_uuid

@property
def service_handle(self) -> int:
"""The integer handle of the Service containing this characteristic"""
return int(self.__service_handle)

@property
def handle(self) -> int:
"""The handle of this characteristic"""
return self.__handle

@property
def uuid(self) -> str:
"""The uuid of this characteristic"""
return self.__uuid

@property
def properties(self) -> List[str]:
"""Properties of this characteristic"""
return self.__properties

@property
def descriptors(self) -> List[BleakGATTDescriptor]:
"""List of descriptors for this service"""
return self.__descriptors

def get_descriptor(
self, specifier: Union[str, UUID]
) -> Union[BleakGATTDescriptor, None]:
"""Get a descriptor by UUID (str or uuid.UUID)"""

matches = [
descriptor
for descriptor in self.descriptors
if descriptor.uuid == str(specifier)
]
if len(matches) == 0:
return None
return matches[0]

def add_descriptor(self, descriptor: BleakGATTDescriptor):
"""Add a :py:class:`~BleakGATTDescriptor` to the characteristic.

Should not be used by end user, but rather by `bleak` itself.
"""
self.__descriptors.append(descriptor)
# FIXME - you probably still need this!
# if descriptor.uuid == defs.CLIENT_CHARACTERISTIC_CONFIGURATION_UUID:
# self.__notification_descriptor = descriptor

@property
def notification_descriptor(self) -> BleakGATTDescriptor:
"""The notification descriptor. Mostly needed by `bleak`, not by end user"""
return self.__notification_descriptor
Loading