Search code examples
androidrx-javarx-androidrxandroidble

Sending the list of commands to device using RxAndroidBle (rxJava)


I am trying to send list of command to device via rxJava. Here is my code:

public void startWriteCommucation(final ArrayList<byte[]> b) {
    if (isConnected()){
            connectionObservable
                    .flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
                        @Override
                        public Observable<Observable<byte[]>> call(final RxBleConnection rxBleConnection) {
                            final List<Observable<byte[]>> list = new ArrayList<>();
                            for (byte[] bytes: b){
                                Log.e("Observer", Arrays.toString(bytes));
                                list.add(rxBleConnection
                                        .writeCharacteristic(BleDevice.characteristicWrite, bytes));
                            }
                            return Observable.from(list);
                        }
                    })
                    .concatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                        @Override
                        public Observable<byte[]> call(Observable<byte[]> observable) {
                            return observable;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<byte[]>() {
                        @Override
                        public void call(byte[] bytes) {
                            view.setTextStatus("Write success");
                            Log.e("Subscriber", Arrays.toString(bytes));
                        }
                    });
        }
}

It works, then i click button once. For example, my method to clikc:

 public void onClick(){
        ArrayList<byte[]> listCmd = new ArrayList<>();
        listCmd.add(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
        listCmd.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
        startWriteCommucation(listCmd);
}

And myLogs in LogCat:

E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

But problem occurs when i use fast double click to button. Then the first click with observable is still working, i click again to call startWriteCommunication method again. And after this my logs look so:

 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

 E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Main problem that they not in order, and my device works not correct. Can you help to find a ploblem?


Solution

  • The problem is a RxAndroidBle library bug (which makes responses not matching requests) and sharing a connection between two communication flows that are stateful (require making two writes in order without any communication in between).

    The bug: the value (byte[]) that is supposed to be written to a BluetoothGattCharacteristic is being set too early. If there are two parallel writers for the same characteristic - one of them could overwrite byte[] that was set by the other one due to a race condition. I have made a fix to the library which is now in the process of code review and should be applied in the near future to the SNAPSHOT release.

    With the changes the output will look like this:

    D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
    D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    

    Possible solution

    If you're not interested in firing the flow two times if the user will tap the button two times quickly - you can just create a shareable flow:

    Observable<byte[]> theSharedFlow = rxBleConnection
      .writeCharacteristic(uuid, data1)
      .flatMap(writtenBytes -> rxBleConnection.writeCharacteristic(uuid, data2))
      .share()
    

    Which when subscribed multiple times will only be executed once until it will finish. In the above snippet the second writeCharacteristic() will be subscribed (and queued to communication) after the first one will emit the written bytes.

    If the application is meant to send arbitrary sets of commands in order at arbitrary times while sharing a connection then it is up for the application to make sure that previous set has finished.

    I hope that I have answered your question. If you would provide more information about the use-case I will try to improve my answer.

    Best Regards

    Edit:

    Alternative solution:

    To have the order being preserved all Observables need to be subscribed in order that they should arrive. The Observable's contract is Observable (if it is cold) is not executed until the subscription. And when using flatMap() the second Observable is subscribed once the first one has emitted.

    To have both writes being transmitted in order they have to be subscribed in the same order so the flow could look like this:

    connectionObservable
                .flatMap(rxBleConnection -> {
                    Observable<byte[]> mergedObservable = null;
                    for (byte[] bytes : b) {
                        Log.d("Observer", Arrays.toString(bytes));
                        final Observable<byte[]> writeObservable = rxBleConnection
                                .writeCharacteristic(uuid, bytes);
    
                        if (mergedObservable == null) {
                            mergedObservable = writeObservable;
                        } else {
                            // merging two Observables to be subscribed at the same time when subscribed
                            mergedObservable = mergedObservable.mergeWith(writeObservable);
                        }
                    }
                    return mergedObservable;
                })
                // removed .concatMap()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        bytes -> Log.d("Subscriber", Arrays.toString(bytes)),
                        throwable -> Log.e("Subscriber", "error", throwable)
                );
    

    RxJava has obviously more ways to achieve the same behaviour but it's not the part of this question.