Search code examples
kotlinreactive-programmingproject-reactorreactor

Reactor - only keep/process the latest value for slow consumer


In reactor, when I have a quick producer but a slow consumer, and the values in the Reactor stream is like a "snapshot", I would like the consumer to process the latest value in the stream and drop the others. (For example, a consumer that shows the exchange value in GUI vs. a producer that converts the exchange ticks to a Flux.)

The Flux#onBackpressureLatest() operator seems to be the right way to go.

I did some Googling and find some usage examples:

Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .delayElements(Duration.ofMillis(3000))
    .subscribe { println("got $it") }

This puts a manual delay after onBackpressureLatest(). It's more like a Flux#sample(Duration) rather than a slow consumer.

Internally, the delayElements(Duration) operator wraps a concatMap, so I converted this into:

Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(3000)
    }

This is like the answers provided in question Latest overflow strategy with size 1 or any alternatives. However, it looks a bit wired. I don't understand why we need the concatMap(op) or flatMap(op, 1, 1) call to get the onBackpressureLatest() working.

I tried the following (simplified) versions but they do not work as expected, why?

// not working try - 1
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .publishOn(Schedulers.boundedElastic())
    .subscribe { item ->
        println("got $item")
        // simulate slow subscriber with sleep
        Thread.sleep(3000)
    }

// not working try - 2
Flux.range(1, 30)
    .delayElements(Duration.ofMillis(500))
    .onBackpressureLatest()
    .publishOn(Schedulers.boundedElastic())
    .subscribe(object : BaseSubscriber<Int>() {
        override fun hookOnSubscribe(subscription: Subscription) {
            // explicitly request 1
            subscription.request(1)
        }

        override fun hookOnNext(value: Int) {
            // simulate slow subscriber with sleep
            Thread.sleep(3000)
            println("got $value")
            // explicitly request 1
            request(1)
        }
    })

Solution

  • To answer my own question

    When the consumer is slow and producer is fast, they need to run on different scheduler threads, otherwise if they run in the same thread, the whole Flux chain will be in sync mode. If in that case, the consumer and producer will run in the same pace in a single thread. So the following won't work

    // Not working, producer and consumer runs synchronously in the same thread
    Flux.range(1, 30)
        .delayElements(Duration.ofMillis(300))
        .onBackpressureLatest()
        .subscribe { item ->
            println("got $item")
            // simulate slow subscriber with sleep
            Thread.sleep(1000)
        }
    

    So we need to switch scheduler thread after the producer to make sure consumer runs in a different thread.

    If we switch the scheduler before .onBackpressureLatest() with .publishOn, the rest of the operator chain will run in the same thread. It's like we just started another thread, and runs the synchronous flux flow there, which is merely the same as the above case, so the following doesn't work.

    // Not working
    // operator publishOn acts as the producer,
    // and it runs synchronously in the same thread with the consumer
    Flux.range(1, 30)
        .delayElements(Duration.ofMillis(300))
        .publishOn(Schedulers.boundedElastic()) // the following runs synchronously as before
        .onBackpressureLatest()
        .subscribe { item ->
            println("got $item")
            // simulate slow subscriber with sleep
            Thread.sleep(1000)
        }
    

    If we put the .publishOn(Schedulers.boundedElastic()) after .onBackpressureLatest(), it doesn't work either. The reason is that the 1-arg publishOn method takes a default prefetch value Queues.SMALL_BUFFER_SIZE = 256. So it will request(256) on subscription, which pushes a pressure to .onBackpressureLatest() that the downstream needs 256 items. Thus .onBackpressureLatest() will offer 256 values directly to .publishOn (if available), and the chain after .publishOn consumes the items synchronously. So the following doesn't work as expected:

    // Not working
    // 1-arg publishOn has prefetch=256 by default
    // this pushes a pressure of 256 to onBackpressureLatest
    Flux.range(1, 30)
        .delayElements(Duration.ofMillis(300))
        .onBackpressureLatest()
        .publishOn(Schedulers.boundedElastic()) // pushes pressure of 256
        .subscribe { item ->
            println("got $item")
            // simulate slow subscriber with sleep
            Thread.sleep(1000)
        }
    

    So what we need is to make sure the operator chain after .onBackpressureLatest() pushes a pressure of 1 when they are ready to process the next item, i.e., in the speed of the consumer. We just need to call .publishOn with the second prefetch argument:

    // Working
    Flux.range(1, 30)
        .delayElements(Duration.ofMillis(300))
        .onBackpressureLatest()
        .publishOn(Schedulers.boundedElastic(), 1) // pushes pressure of 1 when ready to process
        .subscribe { item ->
            println("got $item")
            // simulate slow subscriber with sleep
            Thread.sleep(1000)
        }
    

    The following two alternatives described in the question can be used to replace the .publishOn line. They do the same: 1) switch scheduler thread and 2) ensures back pressure of 1 to onBackpressureLatest().

    • .concatMap { Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }
    • .flatMap({ Mono.just(it).subscribeOn(Schedulers.boundedElastic()) }, 1, 1)

    To rewrite this using BaseSubscriber<T>, we can write

    Flux.range(1, 30)
        .delayElements(Duration.ofMillis(300))
        .onBackpressureLatest()
        .subscribe(object : BaseSubscriber<Int>() {
            override fun hookOnSubscribe(subscription: Subscription) {
                request(1)
            }
    
            override fun hookOnNext(value: Int) {
                Mono.just(value)
                    .subscribeOn(Schedulers.boundedElastic())
                    .subscribe { item ->
                        println("got $item")
                        // simulate slow subscriber with sleep
                        Thread.sleep(1000)
                        request(1)
                    }
            }
        })