Search code examples
androidrx-java2rxandroidblerxbluetooth

Zip list of observables into another Zip observable RxJava2


I am trying to zip the list of zip Observables but the issue is I am only getting the same values from the zipped observables every time. The reason why I am doing this is to perform two operations 1st reading index and 2nd reading data from ble for a certain number of times -in following example it is 6 time.

Not sure how to handle this with RxJava2

here is the code snippet

 private Observable<Pair<byte[],byte[]>> getValueFromIndication(RxBleConnection rxBleConnection){

         final PublishSubject<Boolean> unsubscribeSubject = PublishSubject.create();


        return Observable.zip(

                rxBleConnection.setupIndication(Data.INDEX,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
                rxBleConnection.setupIndication(Data.DATA,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),

                (bytes, bytes2) -> {

                    unsubscribeSubject.onNext(true);

                    return Pair.create(bytes,bytes2);
                }
        );
}

from my main stream i am first creating the list of Observables and zip it and pass it

 .flatMap(rxBleConnection -> {


        List<Observable<Pair<byte[],byte[]>>> observableList = new ArrayList<>();

        for(int i=0;i<6;i++){

            //Creating list of observables so that 6 times this function gets fire
            observableList.add(getValueFromIndication(rxBleConnection));

        }

        // Zipping Zipped list of observables 
        return Observable.zip(observableList,Data::OperationReadings);
    }).subscribe(bytes->{


    })

Here, I always get the same values in Data::OperationReadings. Currently, I am getting the following data which I don't want.

each time same index and value

INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]

The expected data is as follows

each time different index and value

INDEX [1] DATA [10,30,20,30,33,0]
INDEX [2] DATA [11,11,2,0,3,0]
INDEX [3] DATA [0,0,0,0,33,0]
INDEX [4] DATA [10,30,0,30,3,0]
INDEX [5] DATA [10,0,0,30,3,0]
INDEX [6] DATA [10,0,20,30,3,9]

Solution

  • The reason you get the same data repeated 6 times is that you subscribe to individual getValueFromIndication() at the same time. Effectively every Observable run in parallel. You want to run each subscription in sequence. The solution could be to replace this:

            return Observable.zip(observableList,Data::OperationReadings);
    

    with:

            return Observable.concat(observableList) // we want to subscribe each Observable from list after the previous one will complete
                .toList() // we want to gather all results from individual Observables from the list — this returns a Single
                .toObservable() // get back to the Observable class so the types will match
                .map(Data::OperationReadings); // we map it into the OperationReadings class