Search code examples
pythonpython-3.xbluez

How to get notifications from BLE Device using pygatt in python?


I am developing a Linux application using python that will connect to my BLE Device and get the data by notifying characteristic. I am using pygatt for BLE Communications. I can successfully connect and bond to device and read from/write to characteristics. Even I can subscribe to notify characteristic, but the problem is, my BLE device is a custom machine and have 4 counters inside it, every time one of the counter's data changed, it sets the corresponding flag for notification, thus, with the onDataChanged-like methods I can read the counter's data from reading characteristic. In Python using pygatt, I can subscribe to notify characteristic with:

class_name.device.subscribe(uuid.UUID(notify_characteristic),callback=notifyBle)

and the notifyBle is:

def notifyBle(self,handle,data):
    read_data = class_name.device.char_read(uuid.UUID(read_characteristic))
    print(read_data)

When I run the program, first I scan the devices and connect to my device and bond with it, then I discover characteristics and list them. All is successful. After listing the characteristics, I write to write characteristic to clear notification flags, it is successful too. Last I subscribe to notify characteristic it is successful.

After all these processes, I increase my device's counters physically(there are buttons on the device for increasing counters). When I press the button program goes to the notifyBle method and it gives the error, which is:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 137, in run
    event["callback"](event)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 479, in _handle_notification_string
    self._connected_device.receive_notification(handle, values)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/device.py", line 226, in receive_notification
    callback(handle, value)
  File "/home/acd/Masaüstü/python_workspace/ble.py", line 54, in notifyBle
    read_data = bleFunctions.dev.char_read(uuid.UUID(bleFunctions.read_characteristic))
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/device.py", line 17, in wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/device.py", line 40, in char_read
    return self._backend.char_read(self, uuid, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 53, in wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 519, in char_read
    self.sendline('char-read-uuid %s' % uuid)
  File "/usr/lib/python3.5/contextlib.py", line 66, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 180, in event
    self.wait(event, timeout)
  File "/usr/local/lib/python3.5/dist-packages/pygatt/backends/gatttool/gatttool.py", line 154, in wait
    raise NotificationTimeout()
pygatt.exceptions.NotificationTimeout

Any help would be appreciated.

PS: I wrote the exact same program in Android, and in Windows UWP. With python, I am aiming to run this on raspberry pi 3.

PSS: I am using raspberry pi 3 with Ubuntu Mate installed for developing this program in python.


Solution

  • Firstly, create event class like below,

    class Event:
        def __init__(self):
            self.handlers = set()
    
        def handle(self, handler):
            self.handlers.add(handler)
            return self
    
        def unhandle(self, handler):
            try:
                self.handlers.remove(handler)
            except:
                raise ValueError("Handler is not handling this event, so cannot unhandle it.")
            return self
    
        def fire(self, *args, **kargs):
            for handler in self.handlers:
                handler(*args, **kargs)
    
        def getHandlerCount(self):
            return len(self.handlers)
    
        __iadd__ = handle
        __isub__ = unhandle
        __call__ = fire
        __len__  = getHandlerCount
    

    then, create a ble class

    import pygatt
    from eventclass import Event
    
    class myBle:
        ADDRESS_TYPE = pygatt.BLEAddressType.random
        read_characteristic = "0000xxxx-0000-1000-8000-00805f9b34fb"
        write_characteristic = "0000xxxx-0000-1000-8000-00805f9b34fb"
        notify_characteristic = "0000xxxxx-0000-1000-8000-00805f9b34fb"
        def __init__(self,device):
            self.device = device
            self.valueChanged = Event()
            self.checkdata = False
    
        def alert(self):
             self.valueChanged(self.checkdata)
    
        def write(self,data):
            self.device.write_char(self.write_characteristic,binascii.unhexlify(data))
    
        def notify(self,handle,data):
            self.checkdata = True
    
        def read(self):
            if(self.checkdata):
                self.read_data = self.device.char_read(uuid.UUID(self.read_characteristic))
                self.write(bytearray(b'\x10\x00'))
                self.checkdata = False
                return self.read_data
        def discover(self):
            return self.device.discover_characteristics().keys()
    

    when you get notification, you will set the boolean value to true and the alert method will notify the boolean value is changed.You will listen to alert method with

    def triggerEvent(checkdata):
       print(str(checkdata))
    
    ble = myBle(device)
    ble.valueChanged += triggerEvent
    ble.alert()
    

    you can call the read method to get characteristics value with triggerEvent method.