Search code examples
javarx-javareactive-programmingrx-java2

How can a subscriber control a Publisher with reactive pull backpressure?


I have a publisher that may publish faster than the subscriber can handle data. To handle this, I started working with backpressure. Because I do not want to discard any data, I use reactive pull backpressure. I understood this as the Subscriber being able to tell the Publisher when to publish more data, as described in this and the follwing paragraphs.

The publisher is a Flowable that does its work asnychronous in parallel and is merged into a sequential Flowable afterwards. Data should be buffered up to 10 elements, and when this buffer is full, the Flowable should not publish any more data and wait for the next request.

The subscriber is a DisposableSubscriber that requests 10 items at start. Every consumed item requires some computation, and after that a new item will be requested.

My MWE looks like this:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Flowable.fromIterable(src)
        .parallel(10, 1)
        .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
        .flatMap(i -> Single.fromCallable(() -> {
            System.out.println("publisher: " + i);
            Thread.sleep(200);
            return i;
        }).toFlowable())
        .sequential(1)
        .onBackpressureBuffer(10)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

What I expect this code to do is the following: The subscriber requests the first 10 items. The publisher publishes the first 10 items. The subscriber then does its computation in onNext and requests more items, which the publisher will publish.

What actually happens: At first, the publisher seems to unboundedly publish items. At some point, e.g. after 14 published items, the subscriber handles its first item. While that happens, the publisher continues to publish items. After around 30 published items, a io.reactivex.exceptions.MissingBackpressureException: Buffer is full is thrown and the stream ends.

My question: what am I doing wrong? How can I let the subscriber control if and when the publisher publishes data? Obviously, I am doing something horribly wrong. Otherwise, the expectation would not be such different to the reality.

Example output of the above MWE:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

Solution

  • Not an expert in Rx, but let me take a stab at it.. observeOn(...) has its own default buffer size of 128. So, right from the start it's going to request more from upstream than your buffer can hold.

    observeOn(...) accepts an optional buffer size override, but even if you supply it, the ParallelFlowable is going to be invoking your flatMap(...) method more frequently than you want. I'm not 100% sure why, maybe it has its own internal buffering it performs when merging the rails back to sequential.

    I think you can get closer to your desired behavior by using flatMap(...) instead of parralel(...), supplying a maxConcurrency argument.

    One other thing to keep in mind is that you don't want to call subscribeOn(...) - it's meant to affect the upstream Flowable in its entirety. So if you're already calling parallel(...).runOn(...), it has no effect or the effect will be unexpected.

    Armed with the above, I think this gets you closer to what you're looking for:

        List<Integer> src = new ArrayList<>();
        for (int i = 0; i < 200; i++) {
            src.add(i);
        }
        Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
        Flowable.fromIterable(src)
                .flatMap(
                        i -> Flowable.just( i )
                                .subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
                                .map( __ -> {
                                    System.out.println("publisher: " + i);
                                    Thread.sleep(200);
                                    return i;
                                } ),
                        10) // max concurrency
                .observeOn(Schedulers.newThread(), false, 10) // override buffer size
                .doOnError(Throwable::printStackTrace)
                .subscribeWith(new DisposableSubscriber<Integer>() {
                    @Override
                    protected void onStart() {
                        request(10);
                    }
                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("subscriber: " + integer);
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        request(1);
                    }
                    @Override
                    public void onError(Throwable t) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }