Search code examples
androidrx-java2rxandroidble

RxJava take(1) completes wheras Observable.just() does not


I use RXAndroidBle to connect to Bluetooth devices. I use establishConnection to get the connection observable and want to convert this Observable to an Completable. This code works and the completable completes as expected:

connectionObservable
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext {
                ...
                startReadingData()
            }
            .doOnError { ... }
            .take(1)
            .ignoreElements()

whereas this never completes:

connectionObservable
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext {
                ...
                startReadingData()
            }
            .doOnError { ... }
            .flatMap { Observable.just(it) }
            .ignoreElements()  // flatMapCompletable { Completable.complete() } doesn't work either

So I'm purly asking out of interest, why does flatMap with Observable.just() not work, as Obsrevable.just() also completes immediately?


Solution

  • Problem

    Never completes:

    connectionObservable
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext {
                    ...
                    startReadingData()
                }
                .doOnError { ... }
                .flatMap { Observable.just(it) }
                .ignoreElements()  // flatMapCompletable { Completable.complete() } doesn't work either
    

    This is actually quite simple. The connectionObservable is probably infinite. It will call onNext, but not onComplete. The downstream operators receive the onNext emit and process it accordingly. The flatMap operator only completes, when the upstream and the inner-stream emits onComplete. The inner-stream of flatMap completes, but not the source-observable. Therefore you do not get a terminal messages, ever.

    Completes

    connectionObservable
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext {
                    ...
                    startReadingData()
                }
                .doOnError { ... }
                .take(1)
                .ignoreElements()
    

    This stream completes, because there is a terminal operator. In this case you have a take(1). What does the Take-Operator do? It will wait for an onNext emit from source and transforms it to onNext(message) and onComplete(). You could add the flatMap with Observable.just as inner-stream below the take-Operator and it would still complete.

    Take-Operator Impl

        @Override
        public void onNext(T t) {
            if (!done && remaining-- > 0) {
                boolean stop = remaining == 0;
                downstream.onNext(t);
                if (stop) {
                    onComplete();
                }
            }
        }
    

    The implementation of the Take-Operator in RxJava2 looks like this. It is clear, that a upstream onNext will result in a onNext and possibly a onComplete (downstream).