Search code examples
reactive-programmingspring-webfluxproject-reactor

Returning Mono response from subscribe of Mono.fromCallable


What I am trying to accomplish is to return a simple Mono Response. I am calling different backends API's in the method detailsHandler.fetchDetailsValue Since this is a Synchronous blocking call, I am wrapping it in Mono.fromCallable as suggested in the documentation.

But I am facing this error upon compiling -

error: local variables referenced from a lambda expression must be final or effectively final

Actually, inside .subscribe lambda I am trying to assign to Response object which is declared outside the lambda. Since I need to assign the object returned from the fetchDetailsValue method upon subscription, how can I return this response object ?

Please correct me if wrong below and suggest how to fix this. Appreciate any inputs. Thanks!

Below is the sample code -

        @Override
        public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
        
        return requestDO.flatMap(
                    request -> {
            Response response = new Response();                 
            Mono<List<Object>> optionalMono = Mono.fromCallable(() -> {                     
                                return detailsHandler.fetchDetailsValue(request);
                            });
            optionalMono. subscribeOn(Schedulers.boundedElastic())
                            .subscribe(result -> { 
                                     Cat1 cat1Object = null;
                                     Cat2 cat2Object = null;
                                     for(Object obj : result)  {                                     
                                       if (obj instanceof Cat1) {
                                           cat1Object = (Cat1) obj;
                                           response.addResponseObj(cat1Object); // error: local variables referenced from a lambda expression must be final or effectively final                                                
                                       }
                                       if (obj instanceof Cat2) {
                                           cat2Object = (Cat2) obj;
                                           response.addResponseObj(cat2Object); // error: local variables referenced from a lambda expression must be final or effectively final
                                       }
                             }        
            
            });
         return Mono.just(response);
         });                     
         }

When I tried to declare that Response object inside subscribe method and tried to return as and when value is received. But getting the error - Void methods cannot return a value

Below is the code -

   @Override
            public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
            
            return requestDO.flatMap(
                        request -> {                        
                Mono<List<Object>> optionalMono = Mono.fromCallable(() -> {                     
                                    return detailsHandler.fetchDetailsValue(request);
                                });
                optionalMono. subscribeOn(Schedulers.boundedElastic())
                                .subscribe(result -> { 
                                         Response response = new Response(); // Added this inside subscribe lambda. But now getting - Void methods cannot return a value
                                         Cat1 cat1Object = null;
                                         Cat2 cat2Object = null;
                                         for(Object obj : result)  {                                     
                                           if (obj instanceof Cat1) {
                                               cat1Object = (Cat1) obj;
                                               response.addResponseObj(cat1Object);                 
                                           }
                                           if (obj instanceof Cat2) {
                                               cat2Object = (Cat2) obj;
                                               response.addResponseObj(cat2Object); 
                                           }
                                 }        
                return Mono.just(response); // Added this inside subscribe lambda. But now getting - Void methods cannot return a value
                });
             
             });                     
             }

UPDATE:

When I tried like below, I am getting errors. Please correct if anything I am doing wrong.

    public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
    
        return requestDO
                .flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchDetailsValue(request)))
                .map(result -> {
                    Response response = new Response();
                    for (Object obj : result) {
                        if (obj instanceof Cat1) {
                            response.addResponseObj((Cat1) obj);
                        }
                        if (obj instanceof Cat2) {
                            response.addResponseObj((Cat2) obj);
                        }
                    }
                    return response;
                })
                .map(result1 -> {
                    Response response = resultnew;
                    requestDO.flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchAdditionalValue(request, response)))
                .map(result2 -> {
                    return result2;
                });
        }

Solution

  • You should not call subscribe inside your Reactor pipeline. Subscribe should be considered a terminal operation that starts the pipeline asynchronously in an unknown time in the future, and should only serve to connect to some other part of your system.

    What you want is to transform your List<Object> into a new Response using a simple synchronous function, the map operator is made for this:

    public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
    
        return requestDO
                .flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchDetailsValue(request)))
                .map(result -> {
                    Response response = new Response();
                    for (Object obj : result) {
                        if (obj instanceof Cat1) {
                            response.addResponseObj((Cat1) obj);
                        }
                        if (obj instanceof Cat2) {
                            response.addResponseObj((Cat2) obj);
                        }
                    }
                    return response;
                });
    }
    

    Update

    For your updated question you want to use both request and response to call another Mono. You can do this by first pulling the map inside the flatMap, then add another flatMap to it:

    public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
    
        return requestDO
                .flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchDetailsValue(request))
                        .map(result -> {
                            Response response = new Response();
                            for (Object obj : result) {
                                if (obj instanceof Cat1) {
                                    response.addResponseObj((Cat1) obj);
                                }
                                if (obj instanceof Cat2) {
                                    response.addResponseObj((Cat2) obj);
                                }
                            }
                            return response;
                        })
                        .flatMap(response -> Mono.fromCallable(() -> detailsHandler.fetchAdditionalValue(request, response))));
    }