Search code examples
reactive-programmingspring-webfluxproject-reactor

SubscribeOn does not change the thread pool for the whole chain


I want to trigger longer running operation via rest request and WebFlux. The result of a call should just return an info that operation has started. The long running operation I want to run on different scheduler (e.g. Schedulers.single()). To achieve that I used subscribeOn:

Mono<RecalculationRequested> recalculateAll() {
  return provider.size()
      .doOnNext(size -> log.info("Size: {}", size))
      .doOnNext(size -> recalculate(size))
      .map(RecalculationRequested::new);
}

private void recalculate(int toRecalculateSize) {
  Mono.just(toRecalculateSize)
      .flatMapMany(this::toPages)
      .flatMap(page -> recalculate(page))
      .reduce(new RecalculationResult(), RecalculationResult::increment)
      .subscribeOn(Schedulers.single())
      .subscribe(result -> log.info("Result of recalculation - success:{}, failed: {}",
          result.getSuccess(), result.getFailed()));
}

private Mono<RecalculationResult> recalculate(RecalculationPage pageToRecalculate) {
  return provider.findElementsToRecalculate(pageToRecalculate.getPageNumber(), pageToRecalculate.getPageSize())
      .flatMap(this::recalculateSingle)
      .reduce(new RecalculationResult(), RecalculationResult::increment);
}

private Mono<RecalculationResult> recalculateSingle(ElementToRecalculate elementToRecalculate) {
  return recalculationTrigger.recalculate(elementToRecalculate)
      .doOnNext(result -> {
        log.info("Finished recalculation for element: {}", elementToRecalculate);
      })
      .doOnError(error -> {
        log.error("Error during recalculation for element: {}", elementToRecalculate, error);
      });
}

From the above I want to call:

private void recalculate(int toRecalculateSize)

in a different thread. However, it does not run on a single thread pool - it uses a different thread pool. I would expect subscribeOn change it for the whole chain. What should I change and why to execute it in a single thread pool?

Just to mention - method:

provider.findElementsToRecalculate(...)

uses WebClient to get elements.


Solution

  • One caveat of subscribeOn is it does what it says: it runs the act of "subscribing" on the provided Scheduler. Subscribing flows from bottom to top (the Subscriber subscribes to its parent Publisher), at runtime.

    Usually you see in documentation and presentations that subscribeOn affects the whole chain. That is because most operators / sources will not themselves change threads, and by default will start sending onNext/onComplete/onError signals from the thread from which they were subscribed to.

    But as soon as one operator switches threads in that top-to-bottom data path, the reach of subscribeOn stops there. Typical example is when there is a publishOn in the chain.

    The source of data in this case is reactor-netty and netty, which operate on their own threads and thus act as if there was a publishOn at the source.

    For WebFlux, I'd say favor using publishOn in the main chain of operators, or alternatively use subscribeOn inside of inner chains, like inside flatMap.