Search code examples

Good implementation/support for java.util.concurrent.Flow.Processor<T,R>

Recently, I found a good support for Publisher by

Flux.create(fluxSink -> {
           for (int i = 0; i < 10; i++)

Is there any good support for Proccessor? I mean something like or simular:

XXX process = new XXX((inputValue, output) -> {
    if(inputValue == 0)


If not, how can I implement my own or why I cannot do this?

Update 1:

After discussion (see comments) it appeared that in my use case I need use flatMap (see answer), my question was Good implementation of processor by this I meant some functionality that if it's fails I able to take control and emit error instead. I think flatMap will give you quite enough functionality. In my case I used:

        import org.jsoup.Jsoup;

            .flatMap(url -> {
                try {
                    Document document = Jsoup.connect(url).get();
                    return Flux.just(document);
                } catch (IOException e) {
                    return Flux.error(e);


  • From what you described of your use case, I don't expect you really require a Processor. Rather, use flatMap to trigger the asynchronous URL fetches. flatMap, like all Reactive Streams operators, will default to stopping immediately in case of error.

    The only part where you might require a processor is to generate the Flux<URL> if you don't know the URLs in advance (otherwise, a Flux.fromIterable or Flux.just(...) would do just fine).

    If you need to dispatch the result(s) to multiple Subscriber without re-triggering the requests, have a look at publish().connect() and/or cache().