I start playing with Java 9 Flow API, and the first thing that I found and dont like, it´s seems like we cannot use lambdas when we pass a subscriber implementation into the publisher as we can do with RxJava
So I have to define and implement my own Subscriber class
public class CustomSubscriber<T> implements Flow.Subscriber<T> {
protected Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscription done:");
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
And then just pass it to my publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(new CustomSubscriber<>());
This is really verbose, and as I understand it´s because we need to set the subscription in the onSubscribe
callback
protected Flow.Subscription subscription;
To later be used in the onNext
to continue the emissions subscription.request(1);
I still dont get it why this mechanism it´s needed, but it´s avoiding the use of Lambdas as we do in RxJava as this example
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
publisher.subscribe(item -> System.out.println("do something in the onNext"),
e -> System.out.println("do something in the onError"),
() -> System.out.println("Do something in the onComplete"));
I guess this is not possible and I´m not missing nothing here right?
I still dont get it why this mechanism it´s needed
The subscription enables communication from subscriber to publisher. The request
method allows the subscriber to apply backpressure, informing upstream components that it is overloaded and "needs a break". In order to do that, the subscriber needs to hold on to an instance of the subscription and needs to occasionally call request
to get more items.
Subscriber
If you have a use case, where you don't need to apply backpressure and would like to benefit from the reduced complexity, you could implement a LaidBackSubscriber
, which:
onSubscribe
by storing the subscription and immediately calling request
on itonNext
by executing a lambda given during construction and then calling subscription.request(1)
onError
and onComplete
by executing a lambda given during constructionThat should get you what you wanted.
The Java 9 Flow API was created as an integration point for existing async libraries, not as an invitation to implement reactive components in an ad-hoc fashion. It's great to experiment with, but if you really want to create a reactive system, the existing libraries are likely well-suited.