Search code examples
springspring-webfluxspring-webclientreactor

Immediately return first emitted value from two Monos while continuing to process the other asynchronously


I have two data sources, each returning a Mono:

class CacheCustomerClient {
    Mono<Entity> createCustomer(Customer customer)
}

class MasterCustomerClient {
    Mono<Entity> createCustomer(Customer customer)
}

Callers to my application are hitting a Spring WebFlux controller:

@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Flux<Entity> createCustomer(@RequestBody Customer customer) {
    return customerService.createNewCustomer(entity);
}

As long as either data source successfully completes its create operation, I want to immediately return a success response to the caller, however, I still want my service to continue processing the result of the other Mono stream, in the event that an error was encountered, so it can be logged.

The problem seems to be that as soon as a value is returned to the controller, a cancel signal is propagated back through the stream by Spring WebFlux and, thus, no information is logged about a failure.

Here's one attempt:

public Flux<Entity> createCustomer(final Customer customer) {
        var cacheCreate = cacheClient
                .createCustomer(customer)
                .doOnError(WebClientResponseException.class,
                    err -> log.error("Customer creation failed in cache"));
        var masterCreate = masterClient
                .createCustomer(customer)
                .doOnError(WebClientResponseException.class,
                    err -> log.error("Customer creation failed in master"));

        return Flux.firstWithValue(cacheCreate, masterCreate)
                .onErrorMap((err) -> new Exception("Customer creation failed in cache and master"));
    }

Flux.firstWithValue() is great for emitting the first non-error value, but then whichever source is lagging behind is cancelled, meaning that any error is never logged out. I've also tried scheduling these two sources on their own Schedulers and that didn't seem to help either.

How can I perform these two calls asynchronously, and emit the first value to the caller, while continuing to listen for emissions on the slower source?


Solution

  • You can achieve that by transforming you operators to "hot" publishers using share() operator:

    1. First subscriber launch the upstream operator, and additional subscribers get back result cached from the first subscriber:

      Further Subscriber will share [...] the same result.

    2. Once a second subscription has been done, the publisher is not cancellable:

      It's worth noting this is an un-cancellable Subscription.

    So, to achieve your requirement:

    1. Apply share() on each of your operators
    2. Launch a subscription on shared publishers to trigger processing
    3. Use shared operators in your pipeline (here firstWithValue).

    Sample example:

    import java.time.Duration;
    import reactor.core.publisher.Mono;
    
    public class TestUncancellableMono {
    
        // Mock a mono successing quickly
        static Mono<String> quickSuccess() {
            return Mono.delay(Duration.ofMillis(200)).thenReturn("SUCCESS !");
        }
    
        // Mock a mono taking more time and ending in error.
        static Mono<String> longError() {
            return Mono.delay(Duration.ofSeconds(1))
                       .<String>then(Mono.error(new Exception("ERROR !")))
                        .doOnCancel(() -> System.out.println("CANCELLED"))
                        .doOnError(err -> System.out.println(err.getMessage()));
        }
    
        public static void main(String[] args) throws Exception {
            // Transform to hot publisher
            var sharedQuick = quickSuccess().share();
            var sharedLong  = longError().share();
    
            // Trigger launch
            sharedQuick.subscribe();
            sharedLong.subscribe();
    
            // Subscribe back to get the cached result
            Mono
                    .firstWithValue(sharedQuick, sharedLong)
                    .subscribe(System.out::println, err -> System.out.println(err.getMessage()));
    
            // Wait for subscription to end.
            Thread.sleep(2000);
        }
    }
    

    The output of the sample is:

    SUCCESS !
    ERROR !
    

    We can see that error message has been propagated properly, and that upstream publisher has not been cancelled.