Search code examples
rx-java2vert.xvertx-verticle

RxJava - retrieving entry in Observable in second flatMapSingle


We have a vertx verticle which receives an id and uses it see if an entity with the id exist in a database. It contains the following logic:

if (itemFound) {
  e.onNext(item_which_was_found)
}
else {
  e.onNext(null);
}

Another verticle has an Observable which processes a list of id's. It uses rxSend to pass each id in the list to the first verticle to do the database lookup:

Observable<Object> observable = ...
observable.flatMapSingle(id -> {
  return rxSend(VERTICLE_1_ADDRESS, id);
})
.flatMapSingle ( i ->
{ 
  // Logic dependent on if item was found
)
.subscribe();

With the above, it is easy to handle cases where the entity associated with the id was found in the database, because the first vertcle, in onNext(), returns the entity. The question is for the second case, when no entity exists and first verticle returns onNext(null). In this case, how is it possible to retrieve, in the second flatMapSingle, the item in the observable which is currently being processed (that is, the id which has no associated database entity) ? Or is there a better way to structure the code?

Thanks


Solution

  • You can change your observable definition to:

    Observable<Object> observable = observable();
    observable.flatMapSingle(id -> {
      return rxSend(VERTICLE_1_ADDRESS, id).flatMap(i -> {
        // Logic dependent on if item was found
        // id is visible here
      });
    }).subscribe();
    

    Then the id will be visible to your second lambda.