Search code examples
pythonbluetooth-lowenergygatt

Use Python and bleak library to notify a bluetooth GATT device, but the result is not stable


I try to use Python to control some BLE GATT devices. I find Bleak (https://bleak.readthedocs.io/en/latest/) this library to communicate with GATT device.

When I use it, most of the time it works really well.

But once I try to communicate with a BLE GATT medical equipment, I found I can't always notify with this equipment.

Here is my code, I just do a little changes from Bleak's github example:(https://github.com/hbldh/bleak/blob/develop/examples/enable_notifications.py)

import asyncio
import logging

from bleak import discover
from bleak import BleakClient

devices_dict = {}
devices_list = []
receive_data = []

#To discover BLE devices nearby 
async def scan():
    dev = await discover()
    for i in range(0,len(dev)):
        #Print the devices discovered
        print("[" + str(i) + "]" + dev[i].address,dev[i].name,dev[i].metadata["uuids"])
        #Put devices information into list
        devices_dict[dev[i].address] = []
        devices_dict[dev[i].address].append(dev[i].name)
        devices_dict[dev[i].address].append(dev[i].metadata["uuids"])
        devices_list.append(dev[i].address)

#An easy notify function, just print the recieve data
def notification_handler(sender, data):
    print(', '.join('{:02x}'.format(x) for x in data))

async def run(address, debug=False):
    log = logging.getLogger(__name__)
    if debug:
        import sys

        log.setLevel(logging.DEBUG)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.DEBUG)
        log.addHandler(h)

    async with BleakClient(address) as client:
        x = await client.is_connected()
        log.info("Connected: {0}".format(x))

        for service in client.services:
            log.info("[Service] {0}: {1}".format(service.uuid, service.description))
            for char in service.characteristics:
                if "read" in char.properties:
                    try:
                        value = bytes(await client.read_gatt_char(char.uuid))
                    except Exception as e:
                        value = str(e).encode()
                else:
                    value = None
                log.info(
                    "\t[Characteristic] {0}: (Handle: {1}) ({2}) | Name: {3}, Value: {4} ".format(
                        char.uuid,
                        char.handle,
                        ",".join(char.properties),
                        char.description,
                        value,
                    )
                )
                for descriptor in char.descriptors:
                    value = await client.read_gatt_descriptor(descriptor.handle)
                    log.info(
                        "\t\t[Descriptor] {0}: (Handle: {1}) | Value: {2} ".format(
                            descriptor.uuid, descriptor.handle, bytes(value)
                        )
                    )

                #Characteristic uuid
                CHARACTERISTIC_UUID = "put your characteristic uuid"

                await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
                await asyncio.sleep(5.0)
                await client.stop_notify(CHARACTERISTIC_UUID)

if __name__ == "__main__":
    print("Scanning for peripherals...")

    #Build an event loop
    loop = asyncio.get_event_loop()
    #Run the discover event
    loop.run_until_complete(scan())

    #let user chose the device
    index = input('please select device from 0 to ' + str(len(devices_list)) + ":")
    index = int(index)
    address = devices_list[index]
    print("Address is " + address)

    #Run notify event
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(run(address, True))

In most situation, this code can work well as this image.

But sometimes (about 25% ratio), this problem just happened:

[0]08:6B:D7:12:F1:33 Nonin3150_502892837['uuid']
[1]1D:BD:4A:69:8B:AB Unknown []
[2]73:15:CD:47:AF:08 Unknown []
[3]40:4E:36:5B:8D:1B HTC BS 1BBDB9 ['uuid']
[4]6B:FB:E5:DD:7F:4E Unknown []
[5]69:A7:87:23:5C:7C Unknown []
please select device from 0 to 6:0
Address is 08:6B:D7:12:F1:33
Traceback (most recent call last):
  File "D:/Bletest/nonin_test.py", line 91, in <module>
    loop.run_until_complete(run(address, True))
  File "C:\Users\rizal\AppData\Local\Programs\Python\Python37\lib\asyncio\base_events.py", line 579, in run_until_complete
    return future.result()
  File "D:/Bletest/nonin_test.py", line 36, in run
    async with BleakClient(address) as client:
  File "C:\Users\rizal\AppData\Local\Programs\Python\Python37\lib\site-packages\bleak\backends\client.py", line 60, in __aenter__
    await self.connect()
  File "C:\Users\rizal\AppData\Local\Programs\Python\Python37\lib\site-packages\bleak\backends\dotnet\client.py", line 154, in connect
    "Device with address {0} was not found.".format(self.address)
bleak.exc.BleakError: Device with address 08:6B:D7:12:F1:33 was not found.

Process finished with exit code 1

I have no idea why the program can discover this equipment, but can't notify with it.

Is this my code's problem, or it may caused by this equipment's program flow?


Solution

  • I do have the same problem. However, I think it has something to do with the implementation of BLE in Windows. When you scan for devices in the Windows interface you are sometimes able to see how devices appear and dissapear. The devices Advertising Interval could be to long.

    However, it is fixable by wrapping it in a try catch section.

    Something like this could work for you.

        async def connect_to_device(self):
            while True:
                if self.connection_enabled:
                    try:
                        await self.client.connect()
                        self.connected = await self.client.is_connected()
                        if self.connected:
                            print("Connected to Device")
                            self.client.set_disconnected_callback(self.on_disconnect)
                            await self.client.start_notify(
                                self.notify_characteristic, self.notify_callback,
                            )
                            while True:
                                if not self.connected:
                                    break
                                await asyncio.sleep(1.0)
                        else:
                            print(f"Failed to connect to Device")
                    except Exception as e:
                        print(e)
                else:
                    await asyncio.sleep(1.0)
    

    You just need to add this task to the loop

    asyncio.ensure_future(self.connect_to_device(), loop)
    

    define the client

    self.client = BleakClient(self.connected_device.address, loop=loop)
    

    and enable the connection

    self.connection_enabled = true