Search code examples
javaspring-bootspring-webfluxreactive-programming

Manipulate local variable inside the reactive map and return end result only


I was developing logic to calculate the summary details and return it after going through some db operations. It was like this

@Override
    public Mono<ResponseEntity<?>> generateSummary(Locale local) throws Exception {
        Summary summary = new Summary();

        return test1Repository.findBySession("test").map(session -> {

            return test2Repository.findAllById(session.getListOfObject()).map(data -> {

                if(condition){
                    **Logic to update the summary object
                }
                if(condition){
                    **Logic to update the summary object
                }
                return summary;
            });
        }).map(result -> ResponseEntity.ok(result));
    }
public class Summary {
    private AtomicInteger disconnectCount;
    private AtomicInteger connectCount;
}

As a result, I get the list of Summary objects based on the session.getListOfObject() count. Something like this

[
    {
        "disconnectCount": 1,
        "connectCount": 1,
    },
    {
        "disconnectCount": 1,
        "connectCount": 1,
    },
    {
        "disconnectCount": 1,
        "connectCount": 1,
    }
]

But I wanted to return one single Summary object after doing all the manipulations. How do I achieve this behavior in Java reactive environment? Maybe Im not returning it correctly but I'm not sure


Solution

    1. map operation transform input elements one by one, and return as much elements as the input. In your case, you want to perform a reduction operation, i.e use reduce or reduceWith, etc. Reduction operations will merge elements from the input flux into a single output signal/element, a Mono.
    2. You map a flux from a Mono object (i.e. myMono.map(value -> myFlux().map(values -> ...)). This will result in a Mono<Flux<MyValue>> result. To get a Mono, you should use flatMap, which "flattens" your nested publisher into the outside one (you can learn more about these type of operations by looking at the concept of "monad"). Now,

    Here is a simple example based on your snippet:

    public record Summary(int disconnectCount, int connectCount) {
        Summary() { this(0, 0); }
        Summary merge(Summary other) { 
            return Summary(disconnectCount + other.disconnectCount, 
                           connectCount + other.connectCount);
        }
    }
    @Override
        public Mono<ResponseEntity<?>> generateSummary(Locale local) throws Exception {
            Summary summary = new Summary();
    
            return test1Repository.findBySession("test")
                .flatMap(session -> {
                    test2Repository.findAllById(session.getListOfObject())
                        .map(data -> data.isConnection() ? new Summary(0, 1) : new Summary(1, 0))
                        .reduce(Summary::merge);
                })
                .map(result -> ResponseEntity.ok(result));
        }