Why is the subscribe
never printing anything here? Just out of curiosity. This is bad practice anyways: I would normally use observeOn
instead. However, I can't figure out why the subscribe
is never reached...
val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
subject
.map { it + 1 }
.subscribeOn(Schedulers.computation())
.subscribe {
println(Thread.currentThread().name)
countDownLatch.countDown()
}
subject.onNext(1)
countDownLatch.await()
In the process of subscribing, an observer signals its readiness to receive items to the observable via a Subscribe
notification. See the Observable
contract for details.
Furthermore, the Subject
documentation states:
Note that a
PublishSubject
may begin emitting items immediately upon creation (unless you have taken steps to prevent this), and so there is a risk that one or more items may be lost between the time theSubject
is created and the observer subscribes to it.
When you call subject.onNext(_)
immediately after attempting to subscribe on a new thread via .subscribeOn(Schedulers.computation())
, the observable (i.e. subject
) may still be waiting for a Subscribe
notification from the observer. For example:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// this usually prints nothing!
subject.onNext(1)
However, if you add a bit of a time delay before you emit your first item, the observable is much more likely to receive the Subscribe
notification from the observer before you call subject.onNext(_)
. For example:
subject
.subscribeOn(Schedulers.computation())
.subscribe { println("received item") }
// wait for subscription to be established properly
Thread.sleep(1000)
// this usually prints "received item"
subject.onNext(1)
If you want all your subscriptions to receive all the items emitted by an observable, you can do one of the following:
subject.onNext(_)
.subject.onNext(_)
inside itself.These might also be of use:
ReplaySubject
: This allows you to store a history of all previous items, and re-emit them on each subscription. Downside: you need to store some arbitrary number of items in memory.ConnectableObservable
: This ensures that an observable only emits items after .connect()
is called. In particular, the .autoConnect(n)
operator ensures that the observable only emits after n
observers have successfully subscribed.val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)
val isSubscribedLatch = CountDownLatch(1)
subject
.subscribeOn(Schedulers.computation())
.doOnSubscribe { isSubscribedLatch.countDown() }
.map { it + 1 }
.subscribe {
countDownLatch.countDown()
println(Thread.currentThread().name)
}
isSubscribedLatch.await()
subject.onNext(1)
countDownLatch.await()