Search code examples
kotlinrx-javarx-java2

RxJava CompletableSubject toFlowable/ toObservable doesn't emit


So I have a producer/ publisher that is CompletableSubject

I want a read-only version of that for the subscribers/ observers to use but neither toFLowable() nor toObservable is emitting anything

What am I missing?

fun main() {
    val publisher = CompletableSubject.create()

    val readOnlyStream = publisher.toFlowable<Any>()

    println("1 ${publisher.hasComplete()}")
    readOnlyStream.subscribe { item ->
        println("yay, got it $item")
    }

    println("2 ${publisher.hasComplete()}")
    publisher.onComplete()
    println("3 ${publisher.hasComplete()}")

    Thread.sleep(3000L)
}

outputs:

1 false
2 false
3 true

Weird workaround I found

fun main() {
    val publisher = CompletableSubject.create()

    val readOnlyStream = publisher.toSingle {
        "doneee"
    }

    println("1 ${publisher.hasComplete()}")
    readOnlyStream.subscribe { item ->
        println("yay, got it $item")
    }

    println("2 ${publisher.hasComplete()}")
    publisher.onComplete()
    println("3 ${publisher.hasComplete()}")

    Thread.sleep(3000L)
}

outputs:

1 false
2 false
yay, got it doneee
3 true

Solution

  • As akarnokd said in the comments, the readOnlyStream.subscribe { item -> is adding a callback for an item, and an item was never emitted. However that does not mean that the onComplete event was not propagated to the Flowable. The onComplete event can be subscribed like this:

    readOnlyStream.subscribe(
        {}, // onNext, called for each item
        {}, // onError, called once when there is an error
        { println("yay, got onComplete") }, // onComplete, called once when completed
    )
    

    The completableSubject's subscribe method does not have the onNext method since Completables don't emit any items, only onError or onComplete events.