Search code examples
javaproject-reactorside-effectspublisher

How to return a Mono<ServerResponse> (as a side effect?) of Mono.subscribe()?


I have the following code, which "works"...so far. By "works", I mean that the Flux<DemoPOJO> is being returned by service.getAll(), and the "hasElements().subscribe(this::foo)" results in foo() generating output that correctly reflects whether the Flux<DemoPOJO> has any elements.

The desired end state is to return a ServerResponse object, wrapping the Flux<DemoPOJO>, which reflects whether the returned Flux is empty or "hasElements".

My problem is that Mono.subscribe() returns a reactor.core.Disposable, and I want to somehow get to a Mono<ServerResponse>. Or, am I "barking up the wrong tree"?

Add Note: I've seen some examples using Flux.flatMap(), but this seems problematic if the returned Flux has a lot of elements (i.e., checking hasElements() seems a lot better than potentially flat-mapping all the elements).

@Component
public class DemoPOJOHandler {

    public static final String PATH_VAR_ID = "id";

    @Autowired
    private DemoPOJOService service;

    public Mono<ServerResponse> getAll(ServerRequest request) {
        Mono<ServerResponse> response = null;
        Flux<DemoPOJO>       entities = service.getAll();

        entities.hasElements().subscribe(this::foo);
        // just return something, for now
        return ServerResponse.ok().build();
    }

    private Mono<ServerRequest> foo(Boolean hasElements) {
        System.out.println("DEBUG >> Mono has elements -> " + hasElements);
        return Mono.empty();
    }
}

Here is the DemoPOJOService implementation...

@Component
public class DemoPOJOService {

    @Autowired
    private DemoPOJORepo demoPOJORepo;

    public Flux<DemoPOJO> getAll() {
        return Flux.fromArray(demoPOJORepo.getAll());
    }

    // more implementation, omitted for brevity
}

And, here is the DemoPOJORepo implementation...

@Component
public class DemoPOJORepo {

    private static final int NUM_OBJS =20;

    private static DemoPOJORepo demoRepo = null;

    private Map<Integer, DemoPOJO> demoPOJOMap;

    private DemoPOJORepo() {
        initMap();
    }

    public static DemoPOJORepo getInstance() {
        if (demoRepo == null) {
            demoRepo = new DemoPOJORepo();
        }
        return demoRepo;
    }

    public DemoPOJO[] getAll() {
        return demoPOJOMap.values().toArray(new DemoPOJO[demoPOJOMap.size()]);
    }

    // more implementation, omitted for brevity

    private void initMap() {
        demoPOJOMap = new TreeMap<Integer, DemoPOJO>();

        for(int ndx=1; ndx<( NUM_OBJS + 1 ); ndx++) {
            demoPOJOMap.put(ndx, new DemoPOJO(ndx, "foo_" + ndx, ndx+100));
        }
    }
}

Solution

  • @SoCal your answer might seem to work, but it suffers from a downside: getAll() DB call is made twice.

    The difficulty is that you can only decide on the status code once you've started receiving data.

    But since you don't seem to really need the asynchronous nature of the body (you're not streaming the individual elements but producing a one shot JSON response), you could in this case collect the whole result set and map it to a response.

    So call the DB, collect the elements in a Mono<List>, map that to A) an 404 empty response if the list is empty or B) a 200 successful JSON response otherwise (notice the use of syncBody):

    @Component
    public class DemoPOJOHandler {
    
        public static final String PATH_VAR_ID = "id";
    
        @Autowired
        private DemoPOJOService service;
    
        public Mono<ServerResponse> getAll(ServerRequest request) {
            Flux<DemoPOJO> entities = service.getAll();
            Mono<List<DemoPOJO>> collected = entities.collectList();
    
            return collected.map(list -> list.isEmpty() ? 
                ServerResponse.noContent().build() :
                ServerResponse.ok()
                    .contentType(MediaType.APPLICATION_JSON)
                    .syncBody(list)
            );
        }
    }
    

    Side note: I think ResponseEntity is the preferred type for annotated controllers rather than ServerResponse, see https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-ann-responseentity.