Search code examples
pythonpexpect

Using pexpect to detect and handle notifications from Bluetooth LE


I've been writing a python code to read values on Raspberry Pi 3 Model B received by Bluetooth LE. I can read the correct values with:

child.sendline("char-read-hnd handle")
child.expect("Characteristic value/descripto: ",timeout=5)

What i am trying to do now is to check for notifications at any time, so i have a Thread that searches for the expected pattern "Notification handle=" like this:

def run():
  patterns = ['Notification handle=','Indication handle=']
  while True:
    try:
      matched_pattern_index = child.expect(patterns,timeout=1)
      if matched_pattern_index in {0,1}:
        print("Received Notification")
        handleNotification()
    except pexpect.TIMEOUT:
      pass

Now during my main code, i am always doing some child.sendline to check for new values as well as child.expect . The problem is that my Thread call to pexpect.expect above blocks my others pexpect.expect in my code. I already tried to do a second child similar to the first one to work inside the Thread but the result is the same. So anyone has any idea how i can achieve this?

Any help would be appreciated. Thanks in advance


Solution

  • I was thinking of subclassing pexpect.spawn and providing an expect_before(p,c) method that would save a list of patterns p and callback functions c, then override expect(p2) to prefix the list p onto the list p2 of that call before calling the real spawn.expect function.

    If the real function then returns a match index i that is within the size of the list p, we can call function c[i] and loop again. When the index is beyond that list, we adjust it to be an index in list p2 and return from the call. Here's an approximation:

    #!/usr/bin/python
    # https://stackoverflow.com/q/51622025/5008284
    from __future__ import print_function
    import sys, pexpect
    
    class Myspawn(pexpect.spawn):
      def __init__(self, *args, **kwargs):
        pexpect.spawn.__init__(self, *args, **kwargs)
    
      def expect_before(self, patterns, callbacks):
        self.eb_pat = patterns
        self.eb_cb = callbacks
    
      def expect(self, patlist, *args, **kwargs):
        numbefore = len(self.eb_pat)
        while True:
          rc = pexpect.spawn.expect(self, self.eb_pat+patlist, *args, **kwargs)
          if rc>=numbefore:
            break
          self.eb_cb[rc]() # call callback
        return rc-numbefore
    
    def test():
        child = Myspawn("sudo bluetoothctl", logfile=sys.stdout)
        patterns = ['Device FC:A8:9A:..:..:..', 'Device C8:FD:41:..:..:..']
        callbacks = [lambda :handler1(), lambda :handler2()]
        child.expect_before(patterns, callbacks)
        child.sendline('scan on')
        while True:
          child.expect(['[bluetooth].*?#'], timeout=5)