Skip to content

Commit

Permalink
examples/passive_scan: Add example using passive scanning mode
Browse files Browse the repository at this point in the history
In case of BlueZ, using passive scanning mode is not simply passing
"passive" as a ``scan_mode`` parameter, but requires more arguments.
This example showcases that.
  • Loading branch information
bojanpotocnik committed Dec 5, 2022
1 parent 106bbca commit aaa5788
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ Added
* Added optional command line argument to use debug log level to all applicable examples.
* Make sure the disconnect monitor task is properly cancelled on the BlueZ client.
* Added ``BleakNoPassiveScanError`` exception.
* Make sure that BlueZ Advertisement Monitor is actually registered when passive scanning. Solves #1136.
* Added check to verify that BlueZ Advertisement Monitor was actually registered. Solves #1136.
* Added ``passive_scan.py`` example.

Changed
-------
Expand Down
128 changes: 128 additions & 0 deletions examples/passive_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
Scanner using passive scanning mode
--------------
Example similar to detection_callback.py, but using passive scanning
Updated on 2022-11-24 by bojanpotocnik <[email protected]>
"""
import argparse
import asyncio
import logging
from typing import Optional, List, Dict, Any

import bleak
from bleak import AdvertisementData, BLEDevice, BleakScanner

logger = logging.getLogger(__name__)


def _get_os_specific_scanning_params(
uuids: Optional[List[str]],
rssi: Optional[int] = None,
macos_use_bdaddr: bool = False,
) -> Dict[str, Any]:
def get_bluez_dbus_scanning_params() -> Dict[str, Any]:
from bleak.assigned_numbers import AdvertisementDataType
from bleak.backends.bluezdbus.advertisement_monitor import OrPattern
from bleak.backends.bluezdbus.scanner import (
BlueZScannerArgs,
BlueZDiscoveryFilters,
)

filters = BlueZDiscoveryFilters(
# UUIDs= Added below, because it cannot be None
# RSSI= Added below, because it cannot be None
Transport="le",
DuplicateData=True,
)
if uuids:
filters["UUIDs"] = uuids
if rssi:
filters["RSSI"] = rssi

# or_patterns ar required for BlueZ passive scanning
or_patterns = [
# General Discoverable (peripherals)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x02"),
# BR/EDR Not Supported (BLE peripherals)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x04"),
# General Discoverable, BR/EDR Not Supported (BLE peripherals)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x06"),
# General Discoverable, LE and BR/EDR Capable (Controller), LE and BR/EDR Capable (Host) (computers, phones)
OrPattern(0, AdvertisementDataType.FLAGS, b"\x1A"),
]

return {"bluez": BlueZScannerArgs(filters=filters, or_patterns=or_patterns)}

def get_core_bluetooth_scanning_params() -> Dict[str, Any]:
from bleak.backends.corebluetooth.scanner import CBScannerArgs

return {"cb": CBScannerArgs(use_bdaddr=macos_use_bdaddr)}

return {
"BleakScannerBlueZDBus": get_bluez_dbus_scanning_params,
"BleakScannerCoreBluetooth": get_core_bluetooth_scanning_params,
# "BleakScannerP4Android": get_p4android_scanning_params,
# "BleakScannerWinRT": get_winrt_scanning_params,
}.get(bleak.get_platform_scanner_backend_type().__name__, lambda: {})()


async def scan(args: argparse.Namespace, passive_mode: bool):
def scan_callback(device: BLEDevice, adv_data: AdvertisementData):
logger.info("%s: %r", device.address, adv_data)

async with BleakScanner(
detection_callback=scan_callback,
**_get_os_specific_scanning_params(
uuids=args.services, macos_use_bdaddr=args.macos_use_bdaddr
),
scanning_mode="passive" if passive_mode else "active",
):
await asyncio.sleep(60)


async def main(args: argparse.Namespace):
try:
await scan(args, passive_mode=True)
except bleak.exc.BleakNoPassiveScanError as e:
if args.fallback:
logger.warning(
f"Passive scanning not possible, using active scanning ({e})"
)
await scan(args, passive_mode=False)
else:
logger.error(f"Passive scanning not possible ({e})")


if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
)

parser = argparse.ArgumentParser()
parser.add_argument(
"--fallback",
action="store_true",
help="fallback to active scanning mode if passive mode is not possible",
)
parser.add_argument(
"--macos-use-bdaddr",
action="store_true",
help="when true use Bluetooth address instead of UUID on macOS",
)
parser.add_argument(
"--services",
metavar="<uuid>",
nargs="*",
help="UUIDs of one or more services to filter for",
)

arguments = parser.parse_args()

try:
asyncio.run(main(arguments))
except KeyboardInterrupt:
pass

0 comments on commit aaa5788

Please sign in to comment.