Search code examples
kotlinreactive-programmingspring-webfluxproject-reactor

How to run flux in parallel, but collect result in sequence


I want to run flux in parallel, but collect the result in sequence. suppose I have flux of [3,2,1] after doing some task in parallel I expect the result to still be [3,2,1]

val mono = Flux.fromIterable(3 downTo 1)
      .map { it.toString() }
      // this will return the same number
      .flatMap { number -> task(number) }
      .doOnNext { println("Number of $it") }
      // I got 1, 3, 2 which is good because I want it to run in parallel
      .collectList()
      .doOnNext { println(it) }
      // I still got [1,3,2] but I want it to be [3,2,1] according to the iterable order.
      .block()

Solution

  • I think flatMapSequential operator is what you are looking for.

    Flux.fromIterable(3 downTo 1)
          .map { it.toString() }
          .flatMapSequential { number -> task(number) }
          .doOnNext { println("Number of $it") }
          .collectList()
          .doOnNext { println(it) }
    

    Here, flatMapSequential is eagerly subscribing to its inner publishers (in parallel like flatMap) but merges them in the order of their source element, as desired.

    Sample output:

    3
    1
    2
    [3, 2, 1]