Search code examples
reactive-programmingjava-9project-reactorreactive-streamsjava-flow

Good implementation/support for java.util.concurrent.Flow.Processor<T,R>


Recently, I found a good support for Publisher by projectreactor.io:

Flux.create(fluxSink -> {
           for (int i = 0; i < 10; i++)
            fluxSink.next(i);
           fluxSink.complete();
        })
                .map(...)
                .subscribe(...);

Is there any good support for Proccessor? I mean something like or simular:

XXX process = new XXX((inputValue, output) -> {
    if(inputValue == 0)
       output.error();
    else
       output.next(inputValue);
});

publisher.subscribe(process);  
process.subscribe(...);

If not, how can I implement my own or why I cannot do this?

Update 1:

After discussion (see comments) it appeared that in my use case I need use flatMap (see answer), my question was Good implementation of processor by this I meant some functionality that if it's fails I able to take control and emit error instead. I think flatMap will give you quite enough functionality. In my case I used:

        import org.jsoup.Jsoup;

        Flux.just("url")
            .flatMap(url -> {
                try {
                    Document document = Jsoup.connect(url).get();
                    return Flux.just(document);
                } catch (IOException e) {
                    return Flux.error(e);
                }
            })
            .subscribe();

Solution

  • From what you described of your use case, I don't expect you really require a Processor. Rather, use flatMap to trigger the asynchronous URL fetches. flatMap, like all Reactive Streams operators, will default to stopping immediately in case of error.

    The only part where you might require a processor is to generate the Flux<URL> if you don't know the URLs in advance (otherwise, a Flux.fromIterable or Flux.just(...) would do just fine).

    If you need to dispatch the result(s) to multiple Subscriber without re-triggering the requests, have a look at publish().connect() and/or cache().