Search code examples
javakotlinrx-java2rx-kotlin2

What's the behavior of onBackpressureBuffer in RxJava2


What I wanted to do is to have a Flowable with a backpressure buffer of one item that keeps the latest one produced from a stream.

I've tried to use Flowable.onBackpressureBuffer(1, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST). However, it doesn't work as I expected

  Flowable.range(0, 10_000)
      .onBackpressureBuffer(1, {}, BackpressureOverflowStrategy.DROP_OLDEST)
      .observeOn(Schedulers.computation())
      .subscribe {
        println(it)
        Thread.sleep(5)
      }

The output I expected is a sequence of integers, not necessarily contiguous, that should includes the last item 9,999. However, it only printed the first a few contiguous numbers like 0, 1, 2, 3, 4..., different each time, but not the last number 9,999.


Solution

  • I am using the below code and it always prints 9999 in the end. It first prints consecutive numbers ( till 127) and then 9999. Maybe in your case the main executing thread end much earlier than the threads processing the print number. In order to print all the numbers till 9999, I tried changing the backpressure buffer to 10000 (and main thread sleep to much higher value) and this obviously made sure that all numbers are printed as the buffer is quite large.

    public class FlowableTest {
    
        public static void main(String[] args) throws InterruptedException {
            // TODO Auto-generated method stub
    
            Flowable.range(0, 10_000).onBackpressureBuffer(1, () -> {
            }, BackpressureOverflowStrategy.DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(it -> {
                System.out.println(it);
                Thread.sleep(5);
            });
    
            Thread.sleep(50000); // wait the main program sufficient time to let the other threads end
    
        }