Search code examples
kotlinrx-javareactivex

How to properly apply backpressure to PublishSubject?


I have a PublishSubject which emits location updates (LatLng):

val location = PublishSubject.create<LatLng>()

Now, I want to do something with this location, that may take some time and should be done sequentially:

location
    .observeOn(Schedulers.computation())
    .subscribeOn(Schedulers.io())
    .subscribe {
        // ... CPU-heavy operation
    }

Only after each operation in the subscribe is complete, the next can run. Moreover, each update makes the previous one obsolete, so I'm only interested in the latest value. So, the subscriber should receive only the currently latest value at each update. Therefore, I thought of applying backpressure:

location
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeWith(object : DisposableSubscriber<LatLng>() {
        override fun onStart() {
            request(1)
        }

        override fun onError(t: Throwable) {
            // ...
        }

        override fun onComplete() {
            // ...
        }

        override fun onNext(t: LatLng) {
            // ... CPU-heavy task
            request(1)
        }
    })

Unfortunately, this doesn't work, as each LatLng that was emitted is delivered to the subscriber and no values are skipped, as they should be.


Solution

  • The problem is that observeOn always can buffer at least one element. You can accomplish the desired effect via delay though:

    location
        .toFlowable(BackpressureStrategy.LATEST)
        .delay(0, TimeUnit.SECONDS, Schedulers.computation())
        .subscribeWith(object : DisposableSubscriber<LatLng>() {
            override fun onStart() {
                request(1)
            }
    
            override fun onError(t: Throwable) {
                // ...
            }
    
            override fun onComplete() {
                // ...
            }
    
            override fun onNext(t: LatLng) {
                // ... CPU-heavy task
                request(1)
            }
        })
    

    Additional remarks:

    1. subscribeOn has no practical effect with a Subject and can be ignored
    2. The toFlowable is best applied close to the source to avoid flooding delay with items unnecessarily.
    3. If you don't mind 3rd party operators, there is an observeOnLatest operator that can do the same thing.