Search code examples
spring-bootspring-webfluxreactive-mongo-java

How do you throw exceptions within Webflux Mono and Flux streams to the caller?


I have a service that handles the insertion of a new record into a MongoDB collection:

public Mono<ProductDto> insertProduct(Mono<ProductDto> in) {
        //TODO Must handle Duplicate key inserts --> Throw a ProductAlreadyExistsException
        Mono<ProductDto> productDtoMono ;

        try{
            productDtoMono= in.map(ProductDto::toEntity)
                    .flatMap(productRepository::insert)
                    .map(ProductDto::new)
            ;
        }
        catch (DuplicateKeyException ex) {
            throw new ProductAlreadyExistsException();
        }

        return productDtoMono;
    }

When the ID given is already in use, the application throws a org.springframework.dao.DuplicateKeyException.

I am aware the above code with the try/catch block is incorrect, it is mostly there to demonstrate what I want to do. I am very new to Webflux, and reactive programming... I'd like to find out the correct way to handle this, but I have not been able to find much in the way of decent sample code for exception handling in the service layers for this, it is almost always in the router or request handler layer.

Hoping someone might be able to guide me on this.

The exception would be caught, and the application would throw the new, custom ProductAlreadyExistsException created for this purpose.

I have also tried to do this within the flatMap insert, but at this point I am kind of throwing poop at the wall to see if I can stumble into how it should be done:

public Mono<ProductDto> insertProduct(Mono<ProductDto> in) {
        //TODO Must handle Duplicate key inserts --> Throw a ProductAlreadyExistsException
        Mono<ProductDto> productDtoMono ;

            productDtoMono= in.map(ProductDto::toEntity)
                    .flatMap(p -> {
                        try{
                            return productRepository.insert(p);
                        }
                        catch (DuplicateKeyException ex) {
                            return Mono.error(new ProductAlreadyExistsException());
                        }
                    })
                    .map(ProductDto::new)
            ;

        return productDtoMono;
    }

Solution

  • Since DuplicateKeyException is an unchecked exception and not a checked exception (which are quite annoying to use in Reactive code), you can use the onErrorMap()-method here:

    public Mono<ProductDto> insertProduct(Mono<ProductDto> in) {
        return in.map(ProductDto::toEntity)
            .flatMap(productRepository::insert)
            .onErrorMap(DuplicateKeyException.class, e -> new ProductAlreadyExistsException())
            .map(ProductDto::new);
    }
    
    • The intermediate productDtoMono variable here is redundant.
    • If however you need to work with checked exceptions, your last snippet of code is typically how you would do it.
    • Your first snippet of code does not do what you think it does, the catch-block will never run because Project Reactor catches it before your code does and transforms it into an error signal for downstream operators.