Search code examples
kotlinrx-java3

PublishSubject `subscribeOn` behavior


Why is the subscribe never printing anything here? Just out of curiosity. This is bad practice anyways: I would normally use observeOn instead. However, I can't figure out why the subscribe is never reached...

val subject: PublishSubject<Int> = PublishSubject.create()
val countDownLatch = CountDownLatch(1)

subject
    .map { it + 1 }
    .subscribeOn(Schedulers.computation())
    .subscribe {
        println(Thread.currentThread().name)
        countDownLatch.countDown()
    }

subject.onNext(1)
countDownLatch.await()

Solution

  • Why this happens

    In the process of subscribing, an observer signals its readiness to receive items to the observable via a Subscribe notification. See the Observable contract for details.

    Furthermore, the Subject documentation states:

    Note that a PublishSubject may begin emitting items immediately upon creation (unless you have taken steps to prevent this), and so there is a risk that one or more items may be lost between the time the Subject is created and the observer subscribes to it.

    When you call subject.onNext(_) immediately after attempting to subscribe on a new thread via .subscribeOn(Schedulers.computation()), the observable (i.e. subject) may still be waiting for a Subscribe notification from the observer. For example:

    subject
        .subscribeOn(Schedulers.computation())
        .subscribe { println("received item") }
    
    // this usually prints nothing!
    subject.onNext(1)
    

    However, if you add a bit of a time delay before you emit your first item, the observable is much more likely to receive the Subscribe notification from the observer before you call subject.onNext(_). For example:

    subject
        .subscribeOn(Schedulers.computation())
        .subscribe { println("received item") }
    
    // wait for subscription to be established properly
    Thread.sleep(1000)
    
    // this usually prints "received item"
    subject.onNext(1)
    

    What to do?

    If you want all your subscriptions to receive all the items emitted by an observable, you can do one of the following:

    • Block the main thread to wait until all observers are subscribed before you call subject.onNext(_).
    • Create a new observable that waits until all observables are subscribed before calling subject.onNext(_) inside itself.

    These might also be of use:

    • ReplaySubject: This allows you to store a history of all previous items, and re-emit them on each subscription. Downside: you need to store some arbitrary number of items in memory.
    • ConnectableObservable: This ensures that an observable only emits items after .connect() is called. In particular, the .autoConnect(n) operator ensures that the observable only emits after n observers have successfully subscribed.

    Example: blocking the main thread until subscribed

    val subject: PublishSubject<Int> = PublishSubject.create()
    val countDownLatch = CountDownLatch(1)
    val isSubscribedLatch = CountDownLatch(1)
    
    subject
        .subscribeOn(Schedulers.computation())
        .doOnSubscribe { isSubscribedLatch.countDown() }
        .map { it + 1 }
        .subscribe {
            countDownLatch.countDown()
            println(Thread.currentThread().name)
        }
    
    isSubscribedLatch.await()
    subject.onNext(1)
    countDownLatch.await()