Search code examples
javaspringspring-bootreactive-programmingspring-webflux

Extract variable from spring webflux reactive pipeline


I am working on reactive streams using Spring webflux. I want to extract a variable(name) from the middle of the reactive pipeline and use it in a different place as follows.

public class Example {

    public Mono<String> test() {
        String name;

        return Mono.just("some random string")
                    .map(s -> {
                        name = s.toUpperCase(); 
                        return name;
                    }).map(...)
                      .map(...)
                      .flatMap(...)
                      .map(...)
                      .map(result -> result+name)
                      .doOnSuccess(res -> asyncPublish(name));
     
     public void asyncPublish(String name) {
        // logic to write to a Messaging queue asynchronously
     }
    }
}

The above code is not working. This is a contrived example but shows what I want to achieve.

Note: I don't want to use multiple zip operators just to bring the name all the way to the last map where I want to use it. Is there a way I can store it in a variable as shown above and then use it somewhere else whereever I need it.


Solution

  • You might use for example a Tuple2 to pass the value of name along with the modified data through the chain.

    return Mono.just("some random string")
                .map(s -> s.toUpperCase())
                .map(s -> Tuples.of(s, x(s))) // given that x(s) is the transformation of this map statement
                .map(...) // keeping Tuple with the value of `name` in the first slot...
                .flatMap(...) // keeping Tuple with the value of `name` in the first slot...
                .map(resultTuple -> Tuples.of(resultTuple.getT1(), resultTuple.getT2() + resultTuple.getT1()) // keeping Tuple with the value of `name` in the first slot...
                .doOnSuccess(resultTuple -> asyncPublish(resultTuple.getT1()))
                .map(resultTuple -> resultTuple.getT2()); // in case that the returned Mono should contain the modified value...
    

    Tuples is from the package reactor.util.function and part of reactor-core.

    Another way (without passing the value through the chain using Tuples) could be to use AtomicReference (but I still think that the Tuple way is cleaner). The AtomicReference way might look like this:

    public Mono<String> test() {
        final AtomicReference<String> nameRef = new AtomicReference<>();
    
        return Mono.just("some random string")
                    .map(s -> {
                        final String name = s.toUpperCase(); 
                        nameRef.set(name);
                        return name;
                    }).map(...)
                      .map(...)
                      .flatMap(...)
                      .map(...)
                      .map(result -> result+nameRef.get())
                      .doOnSuccess(res -> asyncPublish(nameRef.get()));
     
     public void asyncPublish(String name) {
        // logic to write to a Messaging queue asynchronously
     }
    }