Search code examples
rx-javarx-kotlinpublishsubject

onNext() never gets called on a PublishSubject


I am trying to build a presenter that calculates some events within some time period, shows a loading only the first time of loading, and updates the ui when it is done. Because the events can be updated via multiple ways (such as user preferences) I need to be able to tell the presenter that the events were updated and that it has to refresh them again. Here is what I have right now:

                      subject
                            .map<List<UpcomingRowViewModel>> {
                                provider.calculateEventsBetween(TimePeriod.aYearFrom(firstDay))
                            }
                            .doOnSubscribe {
                                view.showLoading()
                            }
                            .observeOn(resultScheduler)
                            .subscribeOn(workScheduler)
                            .subscribe { upcomingRowViewModels ->
                                view.display(upcomingRowViewModels)
                            }
                      subject.onNext(TRIGGER)

The subject is a PublishSubject of Int. I do the onNext() right after the subscription because I want the data to be refreshed as soon as I subscribe to them.

The above code works wonders in my unit tests and also only when I am running it on a device with the debugger attached. If I just run it (without any debugger), it reaches the view.showLoading() part, but never the provider.calculateEventsBetween(TimePeriod.aYearFrom(firstDay) so the UI gets 'stuck' with the loading.

Any ideas?


Solution

  • The likely reason you don't see the consumer called is .subscribeOn(workScheduler). By applyint this to a Subject, which by itself has no practical use as there are no subscription side-effects when subscribing to a Subject, you delay the subscription to the Subject just enough that the onNext call won't find any observers at that moment.

    What you probably want is something like this:

    subject
        .observeOn(resultScheduler)             // <--------------- (1)
        .doOnNext {
            view.showLoading()
        }
        .observeOn(workScheduler)               // <--------------- (2)
        .map<List<UpcomingRowViewModel>> {
            provider.calculateEventsBetween(
                TimePeriod.aYearFrom(firstDay))
        }
        .observeOn(resultScheduler)             // <--------------- (3)
        .subscribe { upcomingRowViewModels ->
            view.display(upcomingRowViewModels)
        }
    subject.onNext(TRIGGER)
    

    Instead of doOnSubscribe, which gets executed once, (1) makes sure when there is work to do, the subject's onNext emission will trigger the loading indicator on the main thread (assuming resultScheduler is AndroidSchedulers.mainThread in non-tests). Then you'd want to perform the mapping off the main thread and thus (2) moves the item onto the background thread. Once the mapping has happened, the resulting item is then moved onto the main thread again at (3) where your view can display it.