Search code examples
javagroup-byconcurrencyproject-reactor

Why processing of this Flux hangs indefinitely on size 256?


I need to process events coming from a Flux in groups (by id) so that within an individual group each event is processed sequentially, but groups are processed in parallel. As far as I know, this can be achieved with groupBy and concatMap. When I implemented this my tests started to hang indefinitely on some big numbers of unique ids. I isolated the problem to the code below and found a specific number on which the code starts to hang - 256. I definitely don't get why this happens at all and where 256 comes from.

Here is the code which hangs:

@ParameterizedTest
@ValueSource(ints = {250, 251, 252, 253, 254, 255, 256})
void freezeTest(int uniqueStringsCount) {
  var scheduler = Schedulers
      .newBoundedElastic(
          1000,
          1000,
          "really-big-scheduler"
      );
  Flux.range(0, uniqueStringsCount)
      .map(Object::toString)
      .repeat()
      // this represents "a lot of events"
      .take(50_000)
      .groupBy(x -> x)
      // this gets the same results
      // .parallel(400)
      .parallel()
      .flatMap(group ->
          group.concatMap(e ->

              // this represents a processing operation on each event
              Mono.fromRunnable(() -> {
                    try {
                      Thread.sleep(0);
                    } catch (InterruptedException ex) {
                      throw new RuntimeException(ex);
                    }
                  })

              // this also doesn't work
              // Mono.delay(Duration.ofMillis(0))
              // Mono.empty()

          // big scheduler doesn't help either
          // ).subscribeOn(scheduler)
          )
      // ).runOn(scheduler)
      ).runOn(Schedulers.parallel())
      .then()
      .block();
}

We first construct a Flux with a lot of (50k, just an example) Strings. But there are only some number of unique strings in that Flux, which is that split up in that number of groups that are processed in parallel. But events within each group are processed sequentially via concatMap. And this code hangs only on 256 unique strings.

Initially, I thought that some thread pool somewhere is exhausted, so I added a really-big-scheduler to test that - but it only executes slower and also hangs on 256. Then I tried removing blocking Thread.sleep (I started with that since my real implementation is possibly blocking) - but it also hangs on 256 Also, changing parallelism (400 in the code above) doesn't change anything.


Solution

  • Flux.groupBy needs extra care when dealing with a large amount of groups, as stated in its javadoc:

    Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.

    The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

    Here the prefetch amount is set too low: by default it is set to Queues.SMALL_BUFFER_SIZE, which is by default 256 (this can be changed with the property reactor.bufferSize.small). Flux.groupBy has a method to set the prefetch amount manually: Flux.groupBy(Function, int), so I suggest to replace your operator with .groupBy(x -> x, 1024) or another suitable high amount.

    The prefetch amount is important as it is the amount of uncompleted items it can process. In your case, first 255 Scheduler.createWorker() calls are made, each item is put on a Worker, and then put it and the created GroupedFlux in groupBy's internal queues waiting for the Worker to complete. When the 256th item appears before any Worker completes, it is unable to put it in the queues, and hangs.