Recently, I found a good support for Publisher by projectreactor.io:
Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i);
fluxSink.complete();
})
.map(...)
.subscribe(...);
Is there any good support for Proccessor? I mean something like or simular:
XXX process = new XXX((inputValue, output) -> {
if(inputValue == 0)
output.error();
else
output.next(inputValue);
});
publisher.subscribe(process);
process.subscribe(...);
If not, how can I implement my own or why I cannot do this?
Update 1:
After discussion (see comments) it appeared that in my use case I need use flatMap
(see answer), my question was Good implementation of processor by this I meant some functionality that if it's fails I able to take control and emit error instead. I think flatMap
will give you quite enough functionality. In my case I used:
import org.jsoup.Jsoup;
Flux.just("url")
.flatMap(url -> {
try {
Document document = Jsoup.connect(url).get();
return Flux.just(document);
} catch (IOException e) {
return Flux.error(e);
}
})
.subscribe();
From what you described of your use case, I don't expect you really require a Processor
. Rather, use flatMap
to trigger the asynchronous URL fetches. flatMap
, like all Reactive Streams operators, will default to stopping immediately in case of error.
The only part where you might require a processor is to generate the Flux<URL>
if you don't know the URLs in advance (otherwise, a Flux.fromIterable
or Flux.just(...)
would do just fine).
If you need to dispatch the result(s) to multiple Subscriber
without re-triggering the requests, have a look at publish().connect()
and/or cache()
.