Search code examples
springspring-bootreactive-programmingproject-reactorspring-webclient

How to use Spring WebClient to make non-blocking calls and send email after all calls complete?


I'm using Spring's 'WebClient` and project reactor to make non-blocking calls to a list of URLs. My requirements are:

  1. Asynchronously call GET on a list of URLs
  2. Log the URL when each URL is called
  3. Log the URL of a call that results in a exception
  4. Log the URL of a call that is successful
  5. Log the URL of a call that results in a non 2xx HTTP status
  6. Send an email containing a list of URLs where the call resulted in an exception or non 2xx HTTP status

Here's my attempt to do this:

List<Mono<ClientResponse>> restCalls = new ArrayList<>();
List<String> failedUrls = new ArrayList<>();    
for (String serviceUrl : serviceUrls.getServiceUrls()) {
        
        restCalls.add(
                webClientBuilder
                    .build()
                    .get()
                    .uri(serviceUrl)
                    .exchange()
                    .doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
                    .doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
                    .doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
    }
    
    Flux.fromIterable(restCalls)
        .map((data) -> data.subscribe())
        .onErrorContinue((throwable, e) -> {
             
            log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
          
            failedUrls.add(serviceUrl);
        })
        .collectList()
        .subscribe((data) -> {
            log.info("all called");
            email.send("Failed URLs are {}", failedUrls);
    });

The problem is the email is sent before the calls respond. How can I wait until all URLs calls have been completed prior to calling email.send?


Solution

  • As stated in comment, the main error in your example is the use of 'subscribe', that launch queries, but in a context independant from the main flux, so you cannot get back errors or results.

    subscribe is sort of a trigger operation on the pipeline, it's not used for chaining.

    Here is a full example (except email, replaced by logging):

    package fr.amanin.stackoverflow;
    
    import org.springframework.http.HttpStatus;
    import org.springframework.web.reactive.function.client.WebClient;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    import java.util.stream.Collectors;
    
    public class WebfluxURLProcessing {
    
        private static final Logger LOGGER = Logger.getLogger("example");
    
        public static void main(String[] args) {
    
            final List<String> urls = Arrays.asList("https://www.google.com", "https://kotlinlang.org/kotlin/is/wonderful/", "https://stackoverflow.com", "http://doNotexists.blabla");
    
            final Flux<ExchangeDetails> events = Flux.fromIterable(urls)
                    // unwrap request async operations
                    .flatMap(url -> request(url))
                    // Add a side-effect to log results
                    .doOnNext(details -> log(details))
                    // Keep only results that show an error
                    .filter(details -> details.status < 0 || !HttpStatus.valueOf(details.status).is2xxSuccessful());
    
            sendEmail(events);
        }
    
        /**
         * Mock emails by collecting all events in a text and logging it.
         * @param report asynchronous flow of responses
         */
        private static void sendEmail(Flux<ExchangeDetails> report) {
            final String formattedReport = report
                    .map(details -> String.format("Error on %s. status: %d. Reason: %s", details.url, details.status, details.error.getMessage()))
                    // collecting (or reducing, folding, etc.) allows to gather all upstream results to use them as a single value downstream.
                    .collect(Collectors.joining(System.lineSeparator(), "REPORT:"+System.lineSeparator(), ""))
                    // In a real-world scenario, replace this with a subscribe or chaining to another reactive operation.
                    .block();
            LOGGER.info(formattedReport);
        }
    
        private static void log(ExchangeDetails details) {
            if (details.status >= 0 && HttpStatus.valueOf(details.status).is2xxSuccessful()) {
                LOGGER.info("Success on: "+details.url);
            } else {
                LOGGER.log(Level.WARNING,
                        "Status {0} on {1}. Reason: {2}",
                        new Object[]{
                                details.status,
                                details.url,
                                details.error == null ? "None" : details.error.getMessage()
                        });
            }
        }
    
        private static Mono<ExchangeDetails> request(String url) {
            return WebClient.create(url).get()
                    .retrieve()
                    // workaround to counter fail-fast behavior: create a special error that will be converted back to a result
                    .onStatus(status -> !status.is2xxSuccessful(), cr -> cr.createException().map(err -> new RequestException(cr.statusCode(), err)))
                    .toBodilessEntity()
                    .map(response -> new ExchangeDetails(url, response.getStatusCode().value(), null))
                    // Convert back custom error to result
                    .onErrorResume(RequestException.class, err -> Mono.just(new ExchangeDetails(url, err.status.value(), err.cause)))
                    // Convert errors that shut connection before server response (cannot connect, etc.) to a result
                    .onErrorResume(Exception.class, err -> Mono.just(new ExchangeDetails(url, -1, err)));
        }
    
        public static class ExchangeDetails {
            final String url;
            final int status;
            final Exception error;
    
            public ExchangeDetails(String url, int status, Exception error) {
                this.url = url;
                this.status = status;
                this.error = error;
            }
        }
    
        private static class RequestException extends RuntimeException {
            final HttpStatus status;
            final Exception cause;
    
            public RequestException(HttpStatus status, Exception cause) {
                this.status = status;
                this.cause = cause;
            }
        }
    }