-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbluetooth_test_client.py
126 lines (99 loc) · 4.04 KB
/
bluetooth_test_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import sys
import math
import asyncio
import time
import random
# from bleak import BleakClient, BleakScanner
# from bleak.backends.device import BLEDevice
# from bleak.backends.scanner import AdvertisementData
# from bleak.exc import BleakError
'''
class NsBleClient:
MANUFACTURER_DATA = {0xffff: b'$tZuFTNvsLGt9U^gsCM!t8$@Fd6'}
SERVICE_UUID = '00000000-b1b6-417b-af10-da8b3de984be'
BRIGHTNESS_UUID = '00000001-b1b6-417b-af10-da8b3de984be'
VOLUME_UUID = '00000002-b1b6-417b-af10-da8b3de984be'
PAUSE_VOLUME_UUID = '10000001-b1b6-417b-af10-da8b3de984be'
LUX_TO_NITS = 1 / math.pi
@staticmethod
def encode(string: str) -> bytes:
return string.encode('utf-8')
@staticmethod
def decode(byte_array: bytearray) -> str:
return byte_array.decode('utf-8')
async def find_device(self) -> BLEDevice:
print('Discovering devices...')
async with BleakScanner() as scanner:
start_time = time.time()
future = asyncio.Future()
detected = set()
def detection_callback(device: BLEDevice, data: AdvertisementData):
if device.address not in detected:
detected.add(device.address)
print(' ', device)
if data.manufacturer_data == self.MANUFACTURER_DATA:
time_used = time.time() - start_time
print(f'Device found after {time_used:.2f} s')
future.set_result(device)
scanner.register_detection_callback(detection_callback)
return await future
def __init__(self):
pass
# self.device = None
# self.client = None
# self.service = None
# self.brightness_characteristic = None
# self.volume_characteristic = None
# self.pause_characteristic = None
async def discover_and_connect(self):
self.device = await self.find_device()
print(f'Connecting to device {self.device}')
self.client = BleakClient(self.device, timeout=100)
await self.client.connect()
# note: due to a bug in Windows connection will fail if we pair with the device
# so never try to pair with it
services = await self.client.get_services()
self.service = services.get_service(self.SERVICE_UUID)
for c in self.service.characteristics:
print()
print(c.uuid, c.description)
byte_array = await self.client.read_gatt_char(c)
print('Read:', self.decode(byte_array))
if c.uuid == self.BRIGHTNESS_UUID:
self.brightness_characteristic = c
if c.uuid == self.VOLUME_UUID:
self.volume_characteristic = c
if c.uuid == self.PAUSE_VOLUME_UUID:
self.pause_characteristic = c
async def get_brightness(self):
raw = await self.client.read_gatt_char(self.brightness_characteristic)
value = self.decode(raw)
return value
async def get_volume(self):
raw = await self.client.read_gatt_char(self.volume_characteristic)
value = self.decode(raw)
return value
async def pause_volume(self):
await self.client.write_gatt_char(self.pause_characteristic, self.encode('1'))
'''
class NsDummyClient:
async def discover_and_connect(self):
await asyncio.sleep(5)
async def get_brightness(self):
await asyncio.sleep(0.2)
# returns brightness in lux
return random.uniform(0, 20000)
async def get_volume(self):
await asyncio.sleep(0.1)
# unknown unit
return random.uniform(0, 1)
async def pause_volume(self):
await asyncio.sleep(0.1)
async def test():
client = NsDummyClient()
await client.discover_and_connect()
for i in range(10):
print(await client.get_brightness())
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(test())