Search code examples

Blocking RxAndroidBle write operation

How can I perform blocking write operation in Android with RxAndroidBle. Only if the write operation is successful the next command should be performed.

protected void doWriteBytes(UUID characteristic, byte[] bytes) {
    final Disposable disposable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
                    value -> {
                        Timber.d("Write characteristic %s: %s",
                    throwable -> onError(throwable)


protected void test() {
  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});

  // following command should be only performed if doWriteBytes is successful executed

  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});


I know subscribe and onComplete but it is also possible to do without these methods?

The background is I want to override the test method in several different subclasses, so I can perform various doWriteBytes commands (for e.g. ACK commands) to send some bytes to a Bluetooth device but I need to be sure that the next command is only performed if the ACK command is successful send.

Maybe it is more a RxJava2 problem but I am not quite familiar with it.


Thanks for your answer @Dariusz Seweryn. Sorry my question was probably not really clear. I will try to concretize it.

I want to write the source code like a normal function in test() to abstract the RxJava2 implementations. The only different is that the doWriteBytes and other Bluetooth operations (notification, read) should be done via RxAndroidBle. What I have to write to a Bluetooth device depends on the notifications bytes or some other algorithm in the test() method. Additionally, I want to overwrite the test() method to implement a different Bluetooth communication flow for completely different Bluetooth device. It is always important that the Bluetooth operations are sequentially processed.

Now I have three ideas in mind:

1) My first idea is to implement all RxAndroidBle operations blocking, so I can use simple for e.g. loops.

2) My second idea is to dynamically add (concat?) at runtime observations to another in the test() method which is sequentially processes but I need always the return values from the previous observations?

3) My third idea is to combine write/notify/write operation as a method, which I can call in the test() method. The operation should be write bytes to a characterstic A then wait for the notification on characteristic B, do some processing with the received bytes and writes again to characteristic C. But what is written or how the notification process should be dynamically during runtime in the test() method added.

Maybe there is a elegant solution for my problem in RxJava2 or it is not possible at all?


I tried to implement all three ideas but unfortunately I didn't succed.


                .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
                        value -> {
                            Timber.d("Write characteristic %s: %s",
                        throwable -> onError(throwable)

It is always blocking even on success? Do I have to release it somewhere? Additionally the method return void and not a disposable anymore but then I can't dispose it.

2) I am struggling at this idea. On which observable should I concat to if I don't know the starting observable? The connectionObserable doesn't work because it holds the RxBleConnection. The second problem is that the values after a Bluetooth operation are then Java Object classes!? Do I have to cast it every time? Do you have an example how I can concat for example a Bluetooth write operation to a notification Bluetooth result?

3) The problem with this idea is that I don't know how to dynamical add at runtime the processing part to the notification outside of the RxJava subscribe part?

I have a working solution for idea nr 3

protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
    Observable observable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
        .flatMap( writeBytes -> notificationObservable)


    return observable;

Btw. should I create separate threads on stackoverflow with these questions?

If it's help, you can find my experimental source code here.


  • I know subscribe and onComplete but it is also possible to do without these methods?


    Completable foo() { ... }
    Completable bar() { ... )

    One could do:

    Disposable testDisposable = connectionObservable
                                    .flatMapCompletable(connection ->
                                        connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
                                            .andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
                                         () -> { /* completed */ },
                                         throwable -> { /* error happened */ }

    Having the above closed with one .subscribe() one can have control over how many connections the flow will finish. In the above example if a connection would end prematurely during the first write — all next operations (foo(), write, bar()) will not happen at all.


    All your ideas could potentially work — you can try them out.

    Maybe there is an elegant solution for my problem in RxJava2 or it is not possible at all?

    There are .blocking*() functions for Observable/Single/Completable classes if you really need them. Be careful though — you may start experiencing some hard to debug issues depending on your implementation due to introducing more state to your application.