So I have a producer/ publisher that is CompletableSubject
I want a read-only version of that for the subscribers/ observers to use
but neither toFLowable()
nor toObservable
is emitting anything
What am I missing?
fun main() {
val publisher = CompletableSubject.create()
val readOnlyStream = publisher.toFlowable<Any>()
println("1 ${publisher.hasComplete()}")
readOnlyStream.subscribe { item ->
println("yay, got it $item")
}
println("2 ${publisher.hasComplete()}")
publisher.onComplete()
println("3 ${publisher.hasComplete()}")
Thread.sleep(3000L)
}
outputs:
1 false
2 false
3 true
Weird workaround I found
fun main() {
val publisher = CompletableSubject.create()
val readOnlyStream = publisher.toSingle {
"doneee"
}
println("1 ${publisher.hasComplete()}")
readOnlyStream.subscribe { item ->
println("yay, got it $item")
}
println("2 ${publisher.hasComplete()}")
publisher.onComplete()
println("3 ${publisher.hasComplete()}")
Thread.sleep(3000L)
}
outputs:
1 false
2 false
yay, got it doneee
3 true
As akarnokd said in the comments, the readOnlyStream.subscribe { item ->
is adding a callback for an item, and an item was never emitted.
However that does not mean that the onComplete event was not propagated to the Flowable. The onComplete event can be subscribed like this:
readOnlyStream.subscribe(
{}, // onNext, called for each item
{}, // onError, called once when there is an error
{ println("yay, got onComplete") }, // onComplete, called once when completed
)
The completableSubject's subscribe method does not have the onNext method since Completables don't emit any items, only onError or onComplete events.