Search code examples
javaandroidbluetoothrxandroidble

RxAndroidBle state machine setup notification produce infinite loop onDoNext


Currently I am trying to refactor my source code to use the RxAndroidBle library.

I implemented a state machine which call on every Bluetooth successful operation (notify on, write, read and so on) the next state. The rxBleConnection should always be established as long the Bluetooth device is running.

My implementation for the setup notification looks like follow:

protected void setNotificationOn(UUID characteristic) {
    if (isConnected()) {
        final Disposable disposable = connectionObservable
                .flatMap(rxBleConnection -> rxBleConnection.setupNotification(characteristic))
                .doOnNext(notificationObservable -> {
                        Timber.d("Successful set notification on for %s", BluetoothGattUuid.prettyPrint(characteristic));
                        nextMachineStateStep();
                        }
                )
                .flatMap(notificationObservable -> notificationObservable)
                .observeOn(AndroidSchedulers.mainThread())
                .retry(BT_RETRY_TIMES_ON_ERROR)
                .subscribe(
                        bytes -> {
                            onBluetoothNotify(characteristic, bytes);
                            Timber.d("onCharacteristicChanged %s: %s",
                                    BluetoothGattUuid.prettyPrint(characteristic),
                                    byteInHex(bytes));
                            },
                        throwable -> onError(throwable)
                );

        compositeDisposable.add(disposable);
    }
}

if I only once set on the notification for the characteristic it works but if an error happened or if I try to set it once again I will stuck in a infinity loop in the doOnNext and the method setNotificationOn is never terminated. I thought doOnNext is only called once on successful notification setup!? (the BleConflictingNotificationAlreadySetException is never thrown nor other exceptions!?)

How can I repeatedly setup a notification on the same characteristic?

Is there a better way to create a state machine with RxAndroidBle/RxJava2? (I need to call various Bluetooth operation sequential)

You will find my full implementation here

Edit:

However if one would try to subscribe again to .setupNotification() for the same characteristic both subscribers will share the same notification.

Thanks for the explanation, so doOnNext() is not called again because if I subscribe to setupNotification() again the notification is already successful setup.

Is it possible to check if the notification is already setup (so that I can skip this step to go on to my next machine state)?

why your API looks like this or what exactly you want to achieve

I want to achieve that the Bluetooth communication via RxAndroidBle are abstracted, so that the subclass of class BluetoothCommunication can just use the methods:

protected void writeBytes(UUID characteristic, byte[] bytes)
protected void readBytes(UUID characteristic)
protected void setIndicationOn(UUID characteristic)
protected void setNotificationOn(UUID characteristic)
/*  Method is triggered if a Bluetooth data is read from a device. */
protected void onBluetoothRead(UUID characteristic, byte[] value) {}
/* Method is triggered if a Bluetooth data from a device is notified */
protected void onBluetoothNotify(UUID characteristic, byte[] value) {}

to set up the Bluetooth communication sequence. The Bluetooth communication / sequence / algorithm is always differently and can't be parametrisable. But it is always sequential. To initialize the Bluetooth scale firstly I need to setup notify A then B. Secondly, I have to send command C after that I will receive one or many notify calls after that I have to send command B and so on. I support many Bluetooth scales from various vendors, the amount of commands / setup notification and algorithm are always differently.

You can see the subclasses with their various algorithm here

In my experience RxJava2 does not need to have an additional state machine as it is already a library that handles state

How can I abstract the library handle state for the subclasses?


Solution

  • I thought doOnNext is only called once on successful notification setup!?

    In your case this .doOnNext() will get called for each successful notification setup after a connection is established. If the connection will be broken by any cause the .retry() operator will resubscribe to connectionObservable which may start a new connection that would trigger a new notification setup.

    Conclusion: your .doOnNext() may be called up to BT_RETRY_TIMES_ON_ERROR times for each .subscribe()

    How can I repeatedly setup a notification on the same characteristic?

    A notification is active as long as .setupNotification() is subscribed and there is no need to setup it again. However if one would try to subscribe again to .setupNotification() for the same characteristic both subscribers will share the same notification. In your example the notification will be active until the compositeDisposable will be disposed or the connection will break (in which case a new connection will be established up to BT_RETRY_TIMES_ON_ERROR times for each subscribe as mentioned above).

    Is there a better way to create a state machine with RxAndroidBle/RxJava2? (I need to call various Bluetooth operation sequential)

    RxJava2 allows for building state transformations and schedule subscribing to particular Observable in many different manners (sequential as well) e.g.

    Disposable disposable = Observable.concat(Arrays.asList(
            Observable.just("start").doOnSubscribe(disposable1 -> Log.d("Subscribe", "start")),
            Observable.just("0").delay(3, TimeUnit.SECONDS).doOnSubscribe(disposable1 -> Log.d("Subscribe", "0")),
            Observable.just("1").delay(2, TimeUnit.SECONDS).doOnSubscribe(disposable1 -> Log.d("Subscribe", "1")),
            Observable.just("2").delay(1, TimeUnit.SECONDS).doOnSubscribe(disposable1 -> Log.d("Subscribe", "2")),
            Observable.just("end").doOnSubscribe(disposable1 -> Log.d("Subscribe", "end"))
    ))
            .subscribe(s -> Log.d("Result", s));
    
    // 2019-01-04 12:45:13.364 31887-31887/ D/Subscribe: start
    // 2019-01-04 12:45:13.364 31887-31887/ D/Result: start
    // 2019-01-04 12:45:13.364 31887-31887/ D/Subscribe: 0
    // 2019-01-04 12:45:16.366 31887-32529/ D/Result: 0
    // 2019-01-04 12:45:16.367 31887-32529/ D/Subscribe: 1
    // 2019-01-04 12:45:18.367 31887-32535/ D/Result: 1
    // 2019-01-04 12:45:18.368 31887-32535/ D/Subscribe: 2
    // 2019-01-04 12:45:19.369 31887-32537/ D/Result: 2
    // 2019-01-04 12:45:19.371 31887-32537/ D/Subscribe: end
    // 2019-01-04 12:45:19.372 31887-32537/ D/Result: end
    

    There are other operators that make processes sequential as well.

    A more appropriate question is why your API looks like this or what exactly you want to achieve, e.g.

    • Is it to conform to a specified external API?
    • Is the process controlled by an outside entity?
    • Can it be simplified on the outside API level (i.e. a single method like .getMeasurementData(device, callbackForResult))?
    • Is the bluetooth communication process parametrisable or is the algorithm always the same?

    In my experience RxJava2 does not need to have an additional state machine as it is already a library that handles state. Once the requirements are known it is possible to model the chain of RxJava2 operators in a way that will do exactly what is needed.

    Is it possible to check if the notification is already setup (so that I can skip this step to go on to my next machine state)?

    No, there is no way to check it — the caller however knows if it was setup already.