Search code examples
androidbluetoothrxandroidble

RxAndroidBle crash on subscribe to multiple characteristics


I'm currently working on an Android app for a proprietary Bluetooth Low Energy device.

I decided to use RxAndroidBle and I'm happy with it's relative ease of use compared to the built-in bluetooth stack.

Where I'm running into problems: I need to subscribe to two characteristics and continuously read and aggregate their values.

Looking at this example page, http://polidea.github.io/RxAndroidBle/, I have been able to read multiple characteristics as per the example, but have been unsuccessful in combining multiple subscriptions.

This is what I've got:

    subscriptionA = device.establishConnection(this, false)
            .flatMap(rxBleConnection -> rxBleConnection.setupNotification(aUUID))
            .doOnNext(notificationObservable -> {
                // Notification has been set up
            })
            .flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
            .subscribe(bytes -> {
                // Given characteristic has been changes, here is the value.
                System.out.printf("Received 03: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
            });
    subscriptionB = device.establishConnection(this, false)
            .flatMap(rxBleConnection -> rxBleConnection.setupNotification(bUUID))
            .doOnNext(notificationObservable -> {
                // Notification has been set up
            })
            .flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
            .subscribe(bytes -> {
                // Given characteristic has been changes, here is the value.
                System.out.printf("Received 05: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
            });

It crashes as soon as it gets to the second subscription. What am I doing wrong?

Here's the error:

17:47:33.444 27758-27758/com.exam E/AndroidRuntime: FATAL EXCEPTION: main
                                                                      Process: com.exam, PID: 27758
                                                                      rx.exceptions.OnErrorNotImplementedException
                                                                          at rx.Observable$27.onError(Observable.java:7923)
                                                                          at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
                                                                          at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
                                                                          at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48)
                                                                          at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:71)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
                                                                          at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48)
                                                                          at rx.observers.Subscribers$5.onError(Subscribers.java:225)
                                                                          at rx.Observable$ThrowObservable$1.call(Observable.java:9984)
                                                                          at rx.Observable$ThrowObservable$1.call(Observable.java:9974)
                                                                          at rx.Observable.unsafeSubscribe(Observable.java:8098)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
                                                                          at rx.Observable$2.call(Observable.java:162)
                                                                          at rx.Observable$2.call(Observable.java:154)
                                                                          at rx.Observable$2.call(Observable.java:162)
                                                                          at rx.Observable$2.call(Observable.java:154)
                                                                          at rx.Observable$2.call(Observable.java:162)
                                                                          at rx.Observable$2.call(Observable.java:154)
                                                                          at rx.Observable$2.call(Observable.java:162)
                                                                          at rx.Observable$2.call(Observable.java:154)
                                                                          at rx.Observable$2.call(Observable.java:162)
                                                                          at rx.Observable$2.call(Observable.java:154)
                                                                          at rx.Observable.subscribe(Observable.java:8191)
                                                                          at rx.Observable.subscribe(Observable.java:8158)
                                                                          at rx.Observable.subscribe(Observable.java:7914)
                                                                          at com.exam.BTConnection.enableConnection(BTConnection.java:65)
                                                                          at com.exam.MActivity$3.onServiceConnected(mActivity.java:104)
                                                                          at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
                                                                          at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
                                                                          at android.os.Handler.handleCallback(Handler.java:739)
                                                                          at android.os.Handler.dispatchMessage(Handler.java:95)
                                                                          at android.os.Looper.loop(Looper.java:135)
                                                                          at android.app.ActivityThread.main(ActivityThread.java:5254)
                                                                          at java.lang.reflect.Method.invoke(Native Method)
                                                                          at java.lang.reflect.Method.invoke(Method.java:372)
                                                                          at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
                                                                          at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
                                                                          at de.robv.android.xposed.XposedBridge.main(XposedBridge.java:115)
                                                                       Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
                                                                          at com.polidea.rxandroidble.internal.RxBleDeviceImpl.lambda$establishConnection$79(RxBleDeviceImpl.java:54)
                                                                          at com.polidea.rxandroidble.internal.RxBleDeviceImpl.access$lambda$0(RxBleDeviceImpl.java)
                                                                          at com.polidea.rxandroidble.internal.RxBleDeviceImpl$$Lambda$1.call(Unknown Source)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) 
                                                                          at rx.Observable$2.call(Observable.java:162) 
                                                                          at rx.Observable$2.call(Observable.java:154) 
                                                                          at rx.Observable$2.call(Observable.java:162) 
                                                                          at rx.Observable$2.call(Observable.java:154) 
                                                                          at rx.Observable$2.call(Observable.java:162) 
                                                                          at rx.Observable$2.call(Observable.java:154) 
                                                                          at rx.Observable$2.call(Observable.java:162) 
                                                                          at rx.Observable$2.call(Observable.java:154) 
                                                                          at rx.Observable$2.call(Observable.java:162) 
                                                                          at rx.Observable$2.call(Observable.java:154) 
                                                                          at rx.Observable.subscribe(Observable.java:8191) 
                                                                          at rx.Observable.subscribe(Observable.java:8158) 
                                                                          at rx.Observable.subscribe(Observable.java:7914) 
                                                                          at com.exam.BTConnection.enableConnection(BTConnection.java:65) 
                                                                          at com.exam.MActivity$3.onServiceConnected(MActivity.java:104) 
                                                                          at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208) 
                                                                          at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225) 
                                                                          at android.os.Handler.handleCallback(Handler.java:739) 
                                                                          at android.os.Handler.dispatchMessage(Handler.java:95) 
                                                                          at android.os.Looper.loop(Looper.java:135) 
                                                                          at android.app.ActivityThread.main(ActivityThread.java:5254) 
                                                                          at java.lang.reflect.Method.invoke(Native Method) 
                                                                          at java.lang.reflect.Method.invoke(Method.java:372) 

Solution

  • You have made two mistakes:

    1. You have tried to establish two connections to the same device. You see this in the log (Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}). BLE connection is point-to-point type and you cannot have two connections with the same device being opened at the same time (unlike HTTP).
    2. You did not handle exceptions that may happen during connections. You can see this in the log as well (rx.exceptions.OnErrorNotImplementedException)

    How you should do it:

        subscription = device.establishConnection(this, false)
                .flatMap(rxBleConnection -> Observable.combineLatest( // use the same connection and combine latest emissions
                        rxBleConnection.setupNotification(aUUID).<byte[]>flatMap(observable -> observable), // sometimes IDE get's lost in what type is returned from an Observable - that's why I added <byte[]>
                        rxBleConnection.setupNotification(bUUID).<byte[]>flatMap(observable -> observable),
                        Pair::new // merge into a Pair
                ))
                .subscribe(
                        byteArrayPair -> {
                            // here you get the latest values from notifications
                            byte[] aBytes = byteArrayPair.first;
                            byte[] bBytes = byteArrayPair.second; 
                            // do your thing
                        },
                        throwable -> {
                            // handle errors
                        }
                );
    

    Best Regards