Search code examples
javamicroservicesrx-java2vert.x

Vertx/RxJava/WebClient/ApiGateway/Reactive


I am working on Apigateway using vert.x and RxJava. I want to send reactive request for 2 Apis, get response from both of them and send combined JSON by HttpServer. But onComplete() executes to early and returns empty JSON. I think the problem arises from asynchronous character of the vert.x but I don't exactly what's wrong.

Here is my method:

private void dispatchBoth(RoutingContext routingContext) {

    Observer<String> observer = new Observer<String>() {

        JsonArray jsonArray = new JsonArray();

        @Override
        public void onSubscribe(Disposable disposable) {
            System.out.println("Start");

        }

        @Override
        public void onNext(String s) {
            Thread t = new Thread(() -> {
                if(s=="/api/userApi/selectAllUsers") {

                        WebClient client = WebClient.create(vertx);
                            client
                                .get(8081, "localhost", s)
                                .send(ar->{
                                    if (ar.succeeded()) {
                                        HttpResponse<Buffer> response = ar.result();

                                        jsonArray.addAll(response.bodyAsJsonArray());
                                        System.out.println(jsonArray.encodePrettily());

                                    } else {
                                        System.out.println("Something went wrong " + ar.cause().getMessage());
                                    }
                                });

                }else if(s=="/api/holidayApi/selectAllHolidays") {
                        WebClient client = WebClient.create(vertx);
                        client
                                .get(8080, "localhost", s)
                                .send(ar -> {

                                    if (ar.succeeded()) {

                                        HttpResponse<Buffer> response = ar.result();

                                        jsonArray.addAll(response.bodyAsJsonArray());
                                       //  System.out.println(jsonArray.encodePrettily());

                                    } else {
                                        System.out.println("Something went wrong " + ar.cause().getMessage());
                                    }
                                });
                    }
                });
                t.start();
        }
        @Override
        public void onError(Throwable throwable) {

        }

        @Override
        public void onComplete() {
                System.out.println(jsonArray.encodePrettily());
                routingContext.response().end(jsonArray.encodePrettily());
        }
    };
    Observable.fromArray(com).subscribe(observer);
}

And that's the output I get on the console:

[ ]
[ {
  "holidayId" : 2,
  "userId" : 3,
  "place" : "Poland",
  "date" : {
    "year" : 2016,
    "month" : "DECEMBER",
    "dayOfMonth" : 29,
    "dayOfWeek" : "THURSDAY",
    "era" : "CE",
    "dayOfYear" : 364,
    "leapYear" : true,
    "monthValue" : 12,
    "chronology" : {
      "id" : "ISO",
      "calendarType" : "iso8601"
    }
  }
}, {
  "holidayId" : 10,
  "userId" : 1,
  "place" : "Netherland",
  "date" : {
    "year" : 2020,
    "month" : "JANUARY",
    "dayOfMonth" : 21,
    "dayOfWeek" : "TUESDAY",
    "era" : "CE",
    "dayOfYear" : 21,
    "leapYear" : true,
    "monthValue" : 1,
    "chronology" : {
      "id" : "ISO",
      "calendarType" : "iso8601"
    }
  }
}, {
  "userId" : 1,
  "name" : "Kacper",
  "phone_number" : "667667202"
}, {
  "userId" : 3,
  "name" : "Kamil",
  "phone_number" : "6734583443"
}, {
  "userId" : 4,
  "name" : "Janek",
  "phone_number" : "231253575"
}, {
  "userId" : 5,
  "name" : "Grzegorz",
  "phone_number" : "123456789"
}, {
  "userId" : 6,
  "name" : "Justin",
  "phone_number" : "111000111"
}, {
  "userId" : 8,
  "name" : "Mike",
  "phone_number" : "997"
}, {
  "userId" : 9,
  "name" : "Gorge",
  "phone_number" : "991"
} ]

Solution

  • onComplete executes on time: when the input flow of strings is finished. You need another way to wait the moment when all I/O operations are completed. This is a tricky part, I do not know if Vertx or RxJava can do this, but standard Java API can, using CompletableFuture. So we create adapter from CompletableFuture to Handler, which holds the result of one I/O operation:

    class HandelerFuture extends CompletableFuture<JsonArray> 
             implements Handler<AsyncResult<HttpResponse<Buffer>>> {
        @Override
        public void handle(AsyncResult<HttpResponse<Buffer>> ar) {
            if (ar.succeeded()) {
                JsonArray array = ar.result().bodyAsJsonArray();
                super.complete(array);
            } else {
                super.completeExceptionally(ar.cause());
            }
        }
    }
    

    Besides, you need not to wrap the body of onNext method in a Tread. Second, you need not to check what the url string is passed, as it does not make any difference. Third, you use Observer only to handle a list of url strings - this is overcomplication. Plain loop is sufficient.

    CompletableFuture<HandelerFuture[]> dispatchBoth(String... urls) {
        ArrayList<HandelerFuture> futures = new ArrayList<>(); // all results
        for (String url : urls) {
            HandelerFuture future = new HandelerFuture();
            futures.add(future);
            WebClient client = WebClient.create(vertx);
            client.get(8081, "localhost", url)
                  .send(future);
        }
        CompletableFuture all = new CompletableFuture();
        HandelerFuture[] array = futures.toArray(new HandelerFuture[0]);
        CompletableFuture.allOf(array)
                .thenRunAsync(() -> all.complete(array));
        return all;
    }
    

    then it can be run as follows:

        CompletableFuture<HandelerFuture[]> future = dispatchBoth(com);
        HandelerFuture[] results = future.get();
        JsonArray finalArray;
        for (HandelerFuture result:results) {
            try {
                // extract partial json array
                JsonArray partialArray = result.get();
                // combine partialArray with finalArray somehow
            } catch (Exception e) {
                // this is the exception got in handle() method as ar.cause().
                e.printStackTrace();
            }
        }
        routingContext.response().end(finalArray.encodePrettily());
    

    You did not tell how you are going to combine json arrays, so I left this unimplemented.