Search code examples
javareactive-programmingproject-reactor

How to run functions over elements inside flux in parallel?


I am new to Project Reactor. Is it possible to run functions over elements inside flux in parallel?

For example, I have a Flux<Dog> and methods setAge and setName that do some calculation and set that data to age and name fields inside Dog. The methods are independent, and I want them to execute in same time.

Flux.fromIterable(mapToResult())
    .parallel(10)
    .runOn(Schedulers.parallel())
    .flatMap(result -> Mono.zip(setAge(result)
            .onErrorResume(err -> {
                err.printStackTrace();
                return Mono.justOrEmpty(result);
            }), setName(result).onErrorComplete()
            .onErrorResume(err -> {
                err.printStackTrace();
                return Mono.justOrEmpty(result);
            })).map(Tuple2::getT2)
    ).subscribe();

I have tried the above, but the following executes the setAge first and then setName. Is it possible for setName not to wait on setAge?

Note: inside setAge I have Thread.sleep() with high value.

setAge method

private static Mono<Dog> setAge(Dog dog) {
    return Mono.fromSupplier(() -> {
        log.info("Setting age "+dog);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        dog.age = 10;
        return dog;
    });
}

With completable futures I will need something like this to achieve what I want:

.thenCompose(result -> setAge(result).thenCombine(setName(result), (r1,r2) -> result).

Solution

  • Thread.sleep should not be used, but some of the delay methods provided from project-reactor. Example delayElement, delaySubscription etc.