I'm currently working on an Android app for a proprietary Bluetooth Low Energy device.
I decided to use RxAndroidBle and I'm happy with it's relative ease of use compared to the built-in bluetooth stack.
Where I'm running into problems: I need to subscribe to two characteristics and continuously read and aggregate their values.
Looking at this example page, http://polidea.github.io/RxAndroidBle/, I have been able to read multiple characteristics as per the example, but have been unsuccessful in combining multiple subscriptions.
This is what I've got:
subscriptionA = device.establishConnection(this, false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(aUUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(bytes -> {
// Given characteristic has been changes, here is the value.
System.out.printf("Received 03: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
});
subscriptionB = device.establishConnection(this, false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(bUUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(bytes -> {
// Given characteristic has been changes, here is the value.
System.out.printf("Received 05: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
});
It crashes as soon as it gets to the second subscription. What am I doing wrong?
Here's the error:
17:47:33.444 27758-27758/com.exam E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.exam, PID: 27758
rx.exceptions.OnErrorNotImplementedException
at rx.Observable$27.onError(Observable.java:7923)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48)
at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:71)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48)
at rx.observers.Subscribers$5.onError(Subscribers.java:225)
at rx.Observable$ThrowObservable$1.call(Observable.java:9984)
at rx.Observable$ThrowObservable$1.call(Observable.java:9974)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.subscribe(Observable.java:8191)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7914)
at com.exam.BTConnection.enableConnection(BTConnection.java:65)
at com.exam.MActivity$3.onServiceConnected(mActivity.java:104)
at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
at de.robv.android.xposed.XposedBridge.main(XposedBridge.java:115)
Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
at com.polidea.rxandroidble.internal.RxBleDeviceImpl.lambda$establishConnection$79(RxBleDeviceImpl.java:54)
at com.polidea.rxandroidble.internal.RxBleDeviceImpl.access$lambda$0(RxBleDeviceImpl.java)
at com.polidea.rxandroidble.internal.RxBleDeviceImpl$$Lambda$1.call(Unknown Source)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.subscribe(Observable.java:8191)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7914)
at com.exam.BTConnection.enableConnection(BTConnection.java:65)
at com.exam.MActivity$3.onServiceConnected(MActivity.java:104)
at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
You have made two mistakes:
Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
). BLE connection is point-to-point type and you cannot have two connections with the same device being opened at the same time (unlike HTTP).rx.exceptions.OnErrorNotImplementedException
)How you should do it:
subscription = device.establishConnection(this, false)
.flatMap(rxBleConnection -> Observable.combineLatest( // use the same connection and combine latest emissions
rxBleConnection.setupNotification(aUUID).<byte[]>flatMap(observable -> observable), // sometimes IDE get's lost in what type is returned from an Observable - that's why I added <byte[]>
rxBleConnection.setupNotification(bUUID).<byte[]>flatMap(observable -> observable),
Pair::new // merge into a Pair
))
.subscribe(
byteArrayPair -> {
// here you get the latest values from notifications
byte[] aBytes = byteArrayPair.first;
byte[] bBytes = byteArrayPair.second;
// do your thing
},
throwable -> {
// handle errors
}
);
Best Regards