11/28/2021 Edit :
If you need to connect your microbit to your computer using Bluetooth Low Energy, and do stuff when the button is clicked. Jump straight and follow @ukBaz's answer below.
Note: The solution will work perfectly on GNU/Linux, but maybe not so much on Windows.
Below is the original question of the post. I'm not going to edit it to hide my mistakes.
Summary: I have a microbit connected to a rpi-zero. I coded the microbit, when A button
is pressed it will then send data through uart.write
to the rpi-zero.
In this test, the microbit will uart.write("Test")
, write a "Test" word to the rpi-zero.
My ultimate goal is to use rpi-zero's BLE capabilities to act as a control device with instructions sent from microbit buttons.
I found this GATT Server Code written in python for rpi. Which it ran with no problem at all.
The code below will be used for listening to microbit uart service and check whether if data received is "Test"
:
import serial
serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.5, stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Yes")
else:
print("F")
But the real problem is when I try to implement this loop code into the GATT server code.
I cannot seem to get my head around on how to pass this value to self.send_tx
Moreover, it seems that there is a global loop already in the GATT server code. So I tried to use threading to run both of the functions simultaneously but when I add self.send_tx("Test")
it will just throw an error Self is not defined
.
I'm sorry I'm a total noob to coding, does anyone know the possible fix to this? Thank you
Here's the full code:
import sys
import threading
import dbus, dbus.mainloop.glib
import serial
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb, register_ad_error_cb
from example_gatt_server import Service, Characteristic
from example_gatt_server import register_app_cb, register_app_error_cb
BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHARACTERISTIC_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME = 'rpi-gatt-server'
mainloop = None
serialPort = serial.Serial(port = "/dev/ttyACM0", baudrate=115200, bytesize=8, timeout=0.8, stopbits=serial.STOPBITS_ONE)
serialString = " "
(serialPort.in_waiting > 0)
class TxCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,
['notify'], service)
self.notifying = False
GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.on_console_input)
def on_console_input(self, fd, condition):
s = fd.readline()
if s.isspace():
pass
else:
self.send_tx(s)
return True
def send_tx(self, s):
if not self.notifying:
return
value = []
for c in s:
value.append(dbus.Byte(c.encode()))
self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])
def StartNotify(self):
if self.notifying:
print("yes")
return
self.notifying = True
def StopNotify(self):
if not self.notifying:
print("no")
return
self.notifying = False
class RxCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, UART_RX_CHARACTERISTIC_UUID,
['write'], service)
def WriteValue(self, value, options):
print('remote: {}'.format(bytearray(value).decode()))
class UartService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, UART_SERVICE_UUID, True)
self.add_characteristic(TxCharacteristic(bus, 0, self))
self.add_characteristic(RxCharacteristic(bus, 1, self))
class Application(dbus.service.Object):
def __init__(self, bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
return response
class UartApplication(Application):
def __init__(self, bus):
Application.__init__(self, bus)
self.add_service(UartService(bus, 0))
class UartAdvertisement(Advertisement):
def __init__(self, bus, index):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_service_uuid(UART_SERVICE_UUID)
self.add_local_name(LOCAL_NAME)
self.include_tx_power = True
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
return o
print('Skip adapter:', o)
return None
def check():
while True:
serialString = serialPort.readline()
if serialString == b'Test':
print("Okay, Test")
self.send_tx("Test")
else:
print("No")
def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('BLE adapter not found')
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter),
GATT_MANAGER_IFACE)
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
app = UartApplication(bus)
adv = UartAdvertisement(bus, 0)
mainloop = GLib.MainLoop()
service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
ad_manager.RegisterAdvertisement(adv.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
try:
mainloop.run()
except KeyboardInterrupt:
adv.Release()
if __name__ == '__main__':
p1 = threading.Thread(target=main)
p2 = threading.Thread(target=check)
p1.start()
p2.start()
I think this might be an XY problem. What I have understood is that you want to send a button press from the micro:bit to the RPi via Bluetooth Low Energy (BLE). If that is the case, then there is a more efficient way to do this.
In the Bluetooth Profile for the micro:bit there is a button service and a button A status characteristic that can be used. Their UUIDs are:
BTN_SRV = 'E95D9882-251D-470A-A062-FA1922DFA9A8'
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'
You need to set up the GATT server on the micro:bit. The most efficient way to do this is using https://makecode.microbit.org/#editor to create the following:
If you don't have Bluetooth on the left-hand menu then click on the cog in the top right of the screen, select Extensions
, and then select Bluetooth
to replace the Radio
extension.
The GATT client code on the RPi can be simplified by using the pydbus library for the D-Bus bindings.
With Bluetooth, rather than have a while
loop and keep polling the micro:bit, it is more efficient to have an event loop that subscribes to notifications from (in this case) the Button A characteristic. The function I have named btn_handler
is called each time the micro:bit button A is pressed or released.
The code below doesn't do the initial pairing between the micro:bit and the RPi. As pairing is a one-off provisioning step I do that manually.
Here is example python code for the RPi that responses to Button A on the micro:bit being pressed...
from time import sleep
import pydbus
from gi.repository import GLib
DEVICE_ADDR = 'DE:82:35:E7:CE:BE' # micro:bit address
BTN_A_STATE = 'E95DDA90-251D-470A-A062-FA1922DFA9A8'
# DBus object paths
BLUEZ_SERVICE = 'org.bluez'
ADAPTER_PATH = '/org/bluez/hci0'
device_path = f"{ADAPTER_PATH}/dev_{DEVICE_ADDR.replace(':', '_')}"
# setup dbus
bus = pydbus.SystemBus()
mngr = bus.get(BLUEZ_SERVICE, '/')
adapter = bus.get(BLUEZ_SERVICE, ADAPTER_PATH)
device = bus.get(BLUEZ_SERVICE, device_path)
device.Connect()
while not device.ServicesResolved:
sleep(0.5)
def get_characteristic_path(dev_path, uuid):
"""Look up DBus path for characteristic UUID"""
mng_objs = mngr.GetManagedObjects()
for path in mng_objs:
chr_uuid = mng_objs[path].get('org.bluez.GattCharacteristic1', {}).get('UUID')
if path.startswith(dev_path) and chr_uuid == uuid.casefold():
return path
# Characteristic DBus information
btn_a_path = get_characteristic_path(device._path, BTN_A_STATE)
btn_a = bus.get(BLUEZ_SERVICE, btn_a_path)
# Read button A without event loop notifications
print(btn_a.ReadValue({}))
# Enable eventloop for notifications
def btn_handler(iface, prop_changed, prop_removed):
"""Notify event handler for button press"""
if 'Value' in prop_changed:
new_value = prop_changed['Value']
print(f"Button A state: {new_value}")
print(f'As byte: {bytes(new_value)}')
print(f'As bytearray: {bytearray(new_value)}')
print(f'As int: {int(new_value[0])}')
print(f'As bool: {bool(new_value[0])}')
mainloop = GLib.MainLoop()
btn_a.onPropertiesChanged = btn_handler
btn_a.StartNotify()
try:
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
btn_a.StopNotify()
device.Disconnect()