Search code examples
rx-java2vert.xvertx-verticle

Traversing a Vertx JsonArray using RxJava


I have a verticle which consumes incoming messages. Each message will be a vertx JsonObject, which contains a vertx JsonArray. I want to execute logic for each element in this array. The logic itself is contained in a separate verticle. This second verticle uses rxVertx. It defines several consumers, each of which delegates to separate methods, all of which return an Observable.

My question is: how to:

  1. traverse each element in the JsonArray
  2. pass each element to the consumer which works with Observables.

In the first verticle, have tried the following :

EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list  = array.getList();
Observable<Object> observable = Observable.fromArray(list);

observable.flatMapSingle(s -> {
      eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
   }).subscribe();

The call to flatMapSingle does not compile because:

The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})

What is the correct way to do this? Thanks very much


Solution

  • If you define the flatMapSingle function param with a code block, you must use the return keyword:

    observable.flatMapSingle(s -> {
      return eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
    }).subscribe(reply -> {
      // Handle each reply
    });
    

    Note that flatMapSingle does not guarantee replies to be in the same order as the incoming messages. If you need that guarantee, use concatMapSingle.