Search code examples
javaasynchronousproject-reactorreactive-streams

Project Reactor: doOnNext (or the others doOnXXX) async


Is there any method like doOnNext, but async? For example, I need to do some long logging (sent notification by email) for determined element.

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            // For example, we need to do something time-consuming only for 3

            if (v.equals(3)) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("LOG FOR " + v);
        });

ints.subscribe(System.out::println);

But why should I wait for logging of 3? I want to do this logic asynchronously.

Now I have only this solution

Thread.sleep(10000);

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            Mono.just(v).publishOn(myParallel2).subscribe(value -> {
                if (value.equals(3)) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println("LOG FOR " + value);
            });
        });

ints.subscribe(System.out::println);

Is there any "nice" solution for this?


Solution

  • If you're absolutely sure you don't care wether or not the email sending succeeds, then you could use "subscribe-inside-doOnNext" but I'm pretty confident that would be a mistake.

    In order to have your Flux propagate an onError signal if the "logging" fails, the recommended approach is to use flatMap.

    The good news is that since flatMap merges results from the inner publishers immediately into the main sequence, you can get still emit each element immediately AND trigger the email. The only caveat is that the whole thing will only complete once the email-sending Mono has also completed. You can also check within the flatMap lambda if the logging needs to happen at all (rather than inside the inner Mono):

    //assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler
    
    source //we assume elements are also publishOn as relevant in `source`
       .flatMap(v -> {
           //if we can decide right away wether or not to send email, better do it here
           if (shouldSendEmailFor(v)) {
               //we want to immediately re-emit the value, then trigger email and wait for it to complete
               return Mono.just(v)
                   .concatWith(
                       //since Mono<Void> never emits onNext, it is ok to cast it to V
                       //which makes it compatible with concat, keeping the whole thing a Flux<V>
                       sendEmail(v).cast(V.class)
                   );
           } else {
               return Mono.just(v);
           }
        });