Search code examples
javarx-javajava-9java-flow

Java 9 Flow define subscriber with lambdas


I start playing with Java 9 Flow API, and the first thing that I found and dont like, it´s seems like we cannot use lambdas when we pass a subscriber implementation into the publisher as we can do with RxJava

So I have to define and implement my own Subscriber class

public class CustomSubscriber<T> implements Flow.Subscriber<T> {

        protected Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscription done:");
            subscription.request(1);
        }

        @Override
        public void onNext(T item) {
            System.out.println("Got : " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    }

And then just pass it to my publisher

   SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    publisher.subscribe(new CustomSubscriber<>());

This is really verbose, and as I understand it´s because we need to set the subscription in the onSubscribe callback

protected Flow.Subscription subscription;     

To later be used in the onNext to continue the emissions subscription.request(1);

I still dont get it why this mechanism it´s needed, but it´s avoiding the use of Lambdas as we do in RxJava as this example

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
        e -> System.out.println("do something in the onError"),
        () -> System.out.println("Do something in the onComplete"));

I guess this is not possible and I´m not missing nothing here right?


Solution

  • I still dont get it why this mechanism it´s needed

    The subscription enables communication from subscriber to publisher. The request method allows the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". In order to do that, the subscriber needs to hold on to an instance of the subscription and needs to occasionally call request to get more items.

    No pressure Subscriber

    If you have a use case, where you don't need to apply backpressure and would like to benefit from the reduced complexity, you could implement a LaidBackSubscriber, which:

    • implements onSubscribe by storing the subscription and immediately calling request on it
    • implements onNext by executing a lambda given during construction and then calling subscription.request(1)
    • implements onError and onComplete by executing a lambda given during construction

    That should get you what you wanted.

    General advice

    The Java 9 Flow API was created as an integration point for existing async libraries, not as an invitation to implement reactive components in an ad-hoc fashion. It's great to experiment with, but if you really want to create a reactive system, the existing libraries are likely well-suited.