I have a verticle which consumes incoming messages. Each message will be a vertx JsonObject
, which contains a vertx JsonArray
. I want to execute logic for each element in this array. The logic itself is contained in a separate verticle. This second verticle uses rxVertx
. It defines several consumers, each of which delegates to separate methods, all of which return an Observable
.
My question is: how to:
JsonArray
Observables
.In the first verticle, have tried the following :
EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list = array.getList();
Observable<Object> observable = Observable.fromArray(list);
observable.flatMapSingle(s -> {
eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe();
The call to flatMapSingle
does not compile because:
The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})
What is the correct way to do this? Thanks very much
If you define the flatMapSingle
function param with a code block, you must use the return
keyword:
observable.flatMapSingle(s -> {
return eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe(reply -> {
// Handle each reply
});
Note that flatMapSingle
does not guarantee replies to be in the same order as the incoming messages. If you need that guarantee, use concatMapSingle
.