Search code examples
springspring-bootspring-webfluxreactive-streams

How to correctly chain Mono/Flux calls


I'm having trouble with understanding how to achieve my goal with reactive approach. Let's assume that I have a Controller, that will return Flux:

@PostMapping(value = "/mutation/stream/{domainId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Mutation> getMutationReactive(@RequestBody List<MutationRequest> mutationRequests, @PathVariable Integer domainId) {
    return mutationService.getMutations(mutationRequests, domainId);
}

In service, currently with .subscribeOn(Schedulers.boundedElastic()), because it calls for a blocking code that is wrapped into a Callable.

public Flux<Mutation> getMutations(List<MutationRequest> mutationRequests, int domainId) {
        return Flux.fromIterable(mutationRequests)
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(mutationRequest -> getMutation(mutationRequest.getGameId(), mutationRequest.getTypeId(), domainId));
}

getMutation() with blocking calls, currently wrapped into a Callable:

 private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
        return Mono.fromCallable(() -> {
            Mutation mutation = mutationProvider.findByGameIdAndTypeId(gameId, typeId).block(); // mutationProvider.findByGameIdAndTypeId() returns Mono<Mutation>
            if (mutation == null) {
                throw new RuntimeException("Mutation was not found by gameId and typeId");
            }
            State state = stateService.getStateByIds(mutation.getId()), domainId).blockFirst(); //stateService.getStateByIds() returns Mono<State>
            if (state == null || state.getValue() == null) {
                log.info("Requested mutation with gameId[%s] typeId[%s] domainId[%s] is disabled. Value is null.".formatted(gameId, typeId, domainId));
                return null;
            }
            mutation.setTemplateId(state.getTemplateId());
            return (mutation);
        });

    }

How do I approach the getMutation() function to use reactive streams, instead of using .block() methods inside a Callable? Basically, I first need to retrieve Mutation from DB -> then using ID of mutation, get its state from other service -> then if state and its value are not null, set templateId of state to mutation and return, or return null.

I've tried something like this:

private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
    return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
                .flatMap(mutation -> {
                    stateService.getStatesByIds(mutation.getId(), domainId).flatMap(state -> {
                        if (state != null && state.getValue() != null) {
                            mutation.setTemplateId(state.getTemplateId());
                        }
                        //TODO if state/value is null -> need to propagate further to return null instead of mutation...
                        return Mono.justOrEmpty(state);
                    });
                    return Mono.just(mutation);
                });
}

But it's obviously incorrect, nothing is subscribed to stateService.getStatesByIds(mutation.getId()), domainId) AND I would like to return a null if the retrieved state of mutation or its value are null.


Solution

  • You are ignoring the value of the inner flatMap hence the warning.

    Without trying you need something like this

    private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
        return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
                    .flatMap(mutation -> {
                        return stateService.getStatesByIds(mutation.getId(), domainId).flatMap(state -> {
                            if (state != null && state.getValue() != null) {
                                mutation.setTemplateId(state.getTemplateId());
                                return Mono.just(mutation);
                            }
                            return Mono.empty();
                        });
                    });
    }
    

    Although not sure if you could rewrite the outer flatMap not to a regular map instead and you might want to use filter and defaultIfEmpty with that as well

    private Mono<Mutation> getMutation(int gameId, int typeId, int domainId) {
        return mutationProvider.findByGameIdAndTypeId(gameId, typeId)
                    .flatMap(mutation -> {
                        return stateService.getStatesByIds(mutation.getId(), domainId)
        .filter(state -> state != null && state.getValue() != null)
        .flatMap(state -> {
            mutation.setTemplateId(state.getTemplateId());
            return Mono.just(mutation);})
        .defaultIfEmpty(Mono.empty());
    }
    

    This is just from the top of my head and I have no idea what some of the return types are here (Flux or Mono) for your own APIs.