Search code examples
kotlinrx-java2

Why don't operators downstream from Completable (andThen) run?


Why don't operators below andThen get called? I have narrowed down the issue to the difference between using andThen (converting from Completable) to just using Single.

Doesn't work

import io.reactivex.Completable
import io.reactivex.Single

Completable.complete() // or in my case, heavyProcessWhichReturnsCompletable()
        .andThen { println("SUCCESS") } // This runs, but nothing after it
        .andThen { println("FAILURE") } // doesn't print
        .toSingle { "FAILURE" }.subscribe { it ->
            print("Result: $it") // doesn't print
        }

Works:

Single.just("SUCCESS").subscribe { it ->
    print("Result: $it") // prints!
}

Solution

  • Thanks a lot to @akarnokd for explaining the issue. Naively, I was going to blame RxJava for this, but didn't want to turn into this man so I thought about it a bit more. It's my obligation to understand the code I write 😅. This blog post was also instrumental in my understanding.


    It is because andThen takes a CompletableSource argument, but using a lambda (curly braces) with no arguments causes the Completable to live forever (the completable never completes). So no future code (downstream operators) runs. Completable implements CompletableSource, so the first println runs, but not the second:

    Completable.complete()
        .andThen(Completable.complete())
        .andThen {println("runs") } // runs
        .andThen {println("doesn't run") } // doesn't run
        .subscribe()
    

    So above, I was able to use andThen twice successfully. The reason is Completable.complete() was a good CompletableSource, but println

    I've written some explanations in each print statement as to why they run/ don't:

    Completable.complete() // or in my case, heavyProcessWhichReturnsCompletable()
            .andThen (Completable.fromAction {println("SUCCESS")})
            .andThen {it ->
                println("Success, because fromAction succeeds after the code runs if no exception are raised")
                it.onComplete()
            }
            .andThen { println("Success, because the previous completable completed")}
            .andThen { println("Won't print here, because the previous one didn't complete"}
            .subscribe()
    

    Warning: This won't run.

    Completable.complete()
        .andThen { Completable.complete() } // won't complete, because the first parameter is not called!
        .andThen { println("Failure") } // Won't complete
        .toSingle { println("Failure") }
        .subscribe()
    

    Fix use either paranthesis or call onComplete:

    Completable.complete()
            .andThen (Completable.complete()) // Completes
            .andThen { println("Success"); it.onComplete()  } // Completes
            .toSingle { println("Success") }
            .subscribe()