Search code examples
reactive-programmingnonblockingspring-webfluxproject-reactor

How to manipulate object coming from a Flux<Object> with a value coming from a method emitting Mono<Items> in non-blocking way?


I am trying to manipulate my objects received from Flux with data received from a Mono where the methods emitting the Flux of object and Mono of items are both different API calls. The problem is, I don't have control over the threads and the items received from the Mono are never assigned to my object unless I intentionally block() that thread. Kindly suggest if any non-blocking way possible for this scenario.

I have also looked into Schedulers, subscribeOn, publishOn but unable to figure out the pipeline.

public Flux<Object> test {

 method1().map(obj -> {
        if (obj.getTotalItems() > 20) {
            obj.setItems(method2(obj).block());
        }
        return obj;
  });
}

Here method1 is emitting Flux of objects received from API hit.

And method2 is emitting a list of items fetched from another API hit.

How can I make this whole flow non-blocking?


Solution

  • Try flatMap or concatMap

    using flatMap operator you can flatten substream in non-blocking public

    Flux<Object> test {
    
     method1().flatMap(obj -> {
            if (obj.getTotalItems() > 20) {
                return method2(obj)
                         .map(result -> {
                            obj.setItems(result);
                            return obj;
                         });
            }
            return Mono.just(obj);
      });
    }
    

    flatMap allows you to flatten several streams at a time, so in case of long-running operations, you may in more efficient process elements.

    One downside of flatMap is that it does not preserve the order of elements so if you have a sequence of upstream elements like [1, 2, 3, 4] with flatMap there is a chance that the order will be changed because of asynchronous nature of substreams.

    To preserve order, you can use concatMap which flatten only once streams at a time, so there are guarantees that order of flattening elements will be preserved:

    Flux<Object> test {
    
     method1().concatMap(obj -> {
            if (obj.getTotalItems() > 20) {
                return method2(obj)
                         .map(result -> {
                            obj.setItems(result);
                            return obj;
                         });
            }
            return Mono.just(obj);
      });
    }
    

    Note

    Mutation of the objects such a way is not the best idea, and I would prefer to use immutable object pattern object in reactive programming