Search code examples
javarx-java2vert.x

RX Java Single not returning from Single.merge


I have several api calls (Rx singles) that i want to combine into a single Single. I'm using Single.merge to try combine the result of these calls but when i subscribe to the response i'm getting an empty array as the subscribe has already happened. I call the HealthChecker expecting that the subscribe will return the list of results:

     new HealthChecker(vertx)
        .getHealthChecks(endpoints)
        .subscribe(messages -> {
            log.info("Completed health check {}", messages);
            routingContext.response()
                          .putHeader("content-type", "text/json")
                          .end(messages.toString());
        });

The health checker class performs the logic:

public class HealthChecker {

    private static Logger log = LoggerFactory.getLogger(HealthChecker.class);

    private Vertx vertx;
    private WebClient client;

    public HealthChecker(Vertx vertx) {
        this.vertx = vertx;
        client = WebClient.create(vertx);
    }

    public Single<List<String>> getHealthChecks(JsonArray endpoints) {
        return Single.fromCallable(() -> {

            List<Single<String>> healthChecks = endpoints
                .stream()
                .map(endpoint -> getHealthStatus(client, endpoint.toString()))
                .collect(Collectors.toList());

            return consumeHealthChecks(healthChecks).blockingGet();

        });
    }

    private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
        return Single.fromCallable(() -> {
            List<String> messages = new ArrayList<>();

            Single.merge(healthChecks)
                  .timeout(1500, TimeUnit.MILLISECONDS)
                  .subscribe(message -> {
                      log.info("Got health check {}", message);
                      messages.add(message);
                  }, error -> {
                      log.info("Timeout - could not get health check");

                  });

            return messages;
        });
    }

    private Single<String> getHealthStatus(WebClient client, String endpoint) {
        log.info("getting endpoint {}", endpoint);

        return client
            .getAbs(endpoint)
            .rxSend()
            .map(HttpResponse::bodyAsString)
            .map(response -> response);

    }
}

I expect the return value to be a list except all i get is an empty list and then the results come after. Here is the log:

09:12:06.235 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5000/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5001/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5002/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5003/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5004/status
09:12:06.300 [vert.x-eventloop-thread-1] INFO  sys.health.HealthCheckVerticle - Completed health check []
09:12:06.688 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.844 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.898 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":false}
09:12:07.072 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:07.255 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}

Solution

  • Why are you using fromCallable and blockingGet? Also you fire off the merge without actually waiting for it to run to completion, hence the empty list. Instead, compose over the inner Singles:

    public Single<List<String>> getHealthChecks(JsonArray endpoints) {
        return Single.defer(() -> {
    
            List<Single<String>> healthChecks = endpoints
                .stream()
                .map(endpoint -> getHealthStatus(client, endpoint.toString()))
                .collect(Collectors.toList());
    
            return consumeHealthChecks(healthChecks);
        });
    }
    
    private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
        return Single.merge(healthChecks)
                     .timeout(1500, TimeUnit.MILLISECONDS)
                     .toList();
    }