Search code examples
androidrx-java2

Using share(): post last value to new subscribers


@Override
public void onPause() {
    super.onPause();
    compositeDisposable.clear();
}

@Override
public void onResume() {
    super.onResume();

    Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
            .distinctUntilChanged()
            .share();

    compositeDisposable.add(distanceFlowable)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> {Log.w("observer 1", data.value)});

    compositeDisposable.add(distanceFlowable)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> {Log.w("observer 2", data.value)});
}

The code works well when distanceFlowable gets new changes continuously. However in a scenario when there's only one post on the distanceFlowable only observer 1 gets notified.

It behaves like this:

  1. post on distanceFlowable
  2. observer 1 subscribes
  3. logcat prints "observer 1: 133"
  4. observer 2 subscribes

I would want it to behave like:

  1. post on distanceFlowable
  2. observer 1 subscribes
  3. logcat prints "observer 1: 133"
  4. observer 2 subscribes
  5. logcat prints "observer 2: 133"

I tried using ConnectedFlowable instead with publish() and then connect() to the flowable after both observer 1 and observer 2 are subscribed. But then it still posts on flowable even after the compositeDisposable is cleared and there is no one listening.

What is the preferred way to solve this?


Solution

  • I missed that flowable.connect() returns a Disposable that I could add to my compositeDisposable, that way it ends up being cleared in onPause.