Search code examples
exceptionspring-webfluxfluxflow-control

Spring Webflux cancel pipe and return flux


I am going to explain I case I don't know how to solve without using an exception. The case is the following, I am going to explain it with a test but imagine a have a controller that return a Flux that call the "Test Methods" method

public class TestDelete {

    @Test
    void testFinal(){
        Flux<ClassWithFlag> fluxWithAdditionalsPass = testReturnWhenConditionPass();
        List<ClassWithFlag> receivedItemsPass = new ArrayList<>();
        fluxWithAdditionalsPass.subscribe( receivedItemsPass::add );
        System.out.println(receivedItemsPass);

        Flux<ClassWithFlag> fluxWithAdditionalsNoPass = testReturnWhenConditionNoPass();
        List<ClassWithFlag> receivedItemsNoPass = new ArrayList<>();
        fluxWithAdditionalsNoPass.subscribe( receivedItemsNoPass::add );
        System.out.println(receivedItemsNoPass);
    }

    private Flux<ClassWithFlag> testReturnWhenConditionPass() {
        List<ClassWithFlag> listOfObject =
                List.of(ClassWithFlag.builder().build(), ClassWithFlag.builder().build());
        Flux<ClassWithFlag> fluxOfObjects = Flux.fromIterable(listOfObject);

        Flux<ClassWithFlag> fluxOfObjectsWIthMappings = fluxOfObjects
                .map( FakeService::mapMessageToPass )
                .map(FakeService::setFlagAccordingToMessage)
                .collectList()
                .flatMapMany(list -> {
                    boolean hasErrors = ClassWithFlag.anyHasError(list);
                    if (hasErrors) {
                        return Flux.error(new CustomException(list));
                    }
                    return Flux.fromIterable(list);
                });
        return fluxOfObjectsWIthMappings.collectList()
            .flatMapMany( Flux::fromIterable )
            .map( FakeService::additional);
    }

    private Flux<ClassWithFlag> testReturnWhenConditionNoPass() {
        List<ClassWithFlag> listOfObject =
                List.of(ClassWithFlag.builder().build(), ClassWithFlag.builder().build());
        Flux<ClassWithFlag> fluxOfObjects = Flux.fromIterable(listOfObject);

        Flux<ClassWithFlag> fluxOfObjectsWithMapping = fluxOfObjects
                .map(FakeService::toNotPass)
                .map(FakeService::setFlagAccordingToMessage)
                .collectList()
                .flatMapMany(list -> {
                    boolean hasErrors = ClassWithFlag.anyHasError(list);
                    if (hasErrors) {
                        return Flux.error(new CustomException(list));
                    }
                    return Flux.fromIterable(list);
                });

        return fluxOfObjectsWithMapping
                .collectList()
                .flatMapMany( Flux::fromIterable ).map(FakeService::additional);
    }



    @AllArgsConstructor
    public class CustomException extends RuntimeException {
        List<ClassWithFlag> listOfObjectWithError;
    }
}

So this is the main test and this is the fake service and the class

class FakeService {

    static ClassWithFlag mapMessageToPass(ClassWithFlag classWithFlag) {
        classWithFlag.setMessage("Hello");
        return classWithFlag;
    }

    static ClassWithFlag toNotPass(ClassWithFlag classWithFlag) {
        classWithFlag.setMessage("Bye Bye");
        return classWithFlag;
    }

    static ClassWithFlag additional(ClassWithFlag classWithFlag) {
        classWithFlag.setAdditionalMessage("Additional things done");
        return classWithFlag;
    }

    static ClassWithFlag setFlagAccordingToMessage(ClassWithFlag classWithFlag) {
        if (classWithFlag.getMessage().equals("Hello")) {
            classWithFlag.setError(false);
            return classWithFlag;
        }
        classWithFlag.setError(true);
        return classWithFlag;
    }
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClassWithFlag {
    private Boolean error;
    private String message;
    private String additionalMessage;

    public static boolean anyHasError(List<ClassWithFlag> classWithFlags) {
        ClassWithFlag classWithFlag = classWithFlags.stream()
                .filter(ClassWithFlag::getError)
                .findFirst()
                .orElse(null);
        return classWithFlag != null;
    }
}

So as you can see in the test I have like two methods "testReturnWhenConditionPass" and "testReturnWhenConditionNoPass" both are the same the only difference is that one will enter in the conditional to return the Flux.error and the other doesn't, so the one that doesn't will do some additional map methods.

Mainly what I want to achieve with this is that in the case any of the classes has "error" flag to true I want to not continue with the flow and return the flux. The only way I have achieve it is with a Flux error with custom exception that will wrap the response and then I have an exception handler that will return the response like this

@ResponseStatus(HttpStatus.OK)
@ExceptionHandler(CustomException.class)
public Flux<ClassWithFlag> handleException(CustomExceptionex) {
    return Flux.fromIterable(ex.getListOfObjectWithError());
}

I would like to know if there is a better way to handle this on webflux and not use and exception and in case of any error directly return the flux to not continue with the "additional" steps since the flux type is the same.

Thanks all and best regards

IMPORTANT: I know this can be achieved by nesting or private methods like this but I dont want to avoid super nesting or private methods

I dont want to do this

private Flux<ClassWithFlag> testReturnWhenConditionNoPass() {
        List<ClassWithFlag> listOfObject =
                List.of(ClassWithFlag.builder().build(), ClassWithFlag.builder().build());
        Flux<ClassWithFlag> fluxOfObjects = Flux.fromIterable(listOfObject);

        return fluxOfObjects
                .map(FakeService::toNotPass)
                .map(FakeService::setFlagAccordingToMessage)
                .collectList()
                .flatMapMany(list -> {
                    boolean hasErrors = ClassWithFlag.anyHasError(list);
                    if (hasErrors) {
                        return Flux.fromIterable(new list);
                    }
                    return Flux.fromIterable(list)
                               .collectList()
                               .flatMapMany(Flux::fromIterable)
                               .map(FakeService::additional);
                });

    }

Imagine the same case with most steps, it will end up in a incredible nesting also having to add private is not valid solution.

TakeWhile and things like that doesnt work aqlso because it makes loose objects

I found a way to do it the problem is that the additionals will be applied to the ones that also doesnt have errors but is a good way of doing it, maybe with an attomic I can do something similar

private Flux<ClassWithFlag> testReturnWhenConditionPass() {
    List<ClassWithFlag> listOfObject =
            List.of(ClassWithFlag.builder().build(), ClassWithFlag.builder().build());

    return Flux.fromIterable(listOfObject)
            .map(FakeService::mapMessageToPass)
            .map(FakeService::setFlagAccordingToMessage)
            .collectList()
            .flatMapMany(list -> {
                boolean hasErrors = ClassWithFlag.anyHasError(list);
                if (hasErrors) {
                    System.out.println("Errors detected in the list.");
                }
                return Flux.fromIterable(list);
            })
            .map(item -> {
                if (!item.hasError()) {
                    return FakeService.additional(item); // Apply `additional` only to valid items
                }
                return item; // Return the item as-is if it has an error
            });
}

Solution

  • I already found the solution, was easier than I though actually

    private Flux<ClassWithFlag> testReturnWhenConditionNoPass() {
            List<ClassWithFlag> listOfObject =
                    List.of(ClassWithFlag.builder().build(), ClassWithFlag.builder().build());
            Flux<ClassWithFlag> fluxOfObjects = Flux.fromIterable(listOfObject);
    
            Flux<ClassWithFlag> fluxOfObjectsWithMapping = fluxOfObjects
                    .map(FakeService::toNotPass)
                    .map(FakeService::setFlagAccordingToMessage)
                    .collectList()
                    .flatMapMany(list -> {
                        boolean hasErrors = ClassWithFlag.anyHasError(list);
                        if (hasErrors) {
                            return Flux.error(new CustomException(list));
                        }
                        return Flux.fromIterable(list);
                    });
    
            return fluxOfObjectsWithMapping
                    .collectList()
                    .flatMapMany( Flux::fromIterable ).map(FakeService::additional)
                    .onErrorResume(exception -> {
                        if (exception instanceof CustomException) {
                            return Flux.fromIterable(((CustomException) exception).getLcoaAdjustments());
                        }
                        return Flux.error(exception);
                    });
        }
    

    So bascially I have to think in a different way, describe all rpocess in the happy path and emit a flux error with an exception that would wrap the list in case of errors. At the end describe what happen if there is an error of this type because all the previos process would be skiped but then changing the flux response for the flux response list with error that already will skip the steps and return the flux to the previos method without having to use a handler