Search code examples
javamultithreadingproject-reactorreactivereactor

Reactor - understanding thread pools in .flatMap()


I try to understand how does reactive programming really work. I prepared simple demo for this purpose: reactive WebClient from Spring Framework sends requests to simple rest api and this client prints name of thread in each operation.

rest api:

@RestController
@SpringBootApplication
public class RestApiApplication {

    public static void main(String[] args) {
        SpringApplication.run(RestApiApplication.class, args);
    }

    @PostMapping("/resource")
    public void consumeResource(@RequestBody Resource resource) {
        System.out.println(String.format("consumed resource: %s", resource.toString()));
    }
}

@Data
@AllArgsConstructor
class Resource {
    private final Long id;
    private final String name;
}

and the most important - reactive web client:

@SpringBootApplication
public class ReactorWebclientApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactorWebclientApplication.class, args);
    }

    private final TcpClient tcpClient = TcpClient.create();

    private final WebClient webClient = WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
        .baseUrl("http://localhost:8080")
        .build();

    @PostConstruct
    void doRequests() {
        var longs = LongStream.range(1L, 10_000L)
            .boxed()
            .toArray(Long[]::new);

        var longsStream = Stream.of(longs);

        Flux.fromStream(longsStream)
            .map(l -> {
                System.out.println(String.format("------- map [%s] --------", Thread.currentThread().getName()));
                return new Resource(l, String.format("name %s", l));
            })
            .filter(res -> {
                System.out.println(String.format("------- filter [%s] --------", Thread.currentThread().getName()));
                return !res.getId().equals(11_000L);
            })
            .flatMap(res -> {
                System.out.println(String.format("------- flatmap [%s] --------", Thread.currentThread().getName()));
                return webClient.post()
                    .uri("/resource")
                    .syncBody(res)
                    .header("Content-Type", "application/json")
                    .header("Accept", "application/json")
                    .retrieve()
                    .bodyToMono(Resource.class)
                    .doOnSuccess(ignore -> System.out.println(String.format("------- onsuccess [%s] --------", Thread.currentThread().getName())))
                    .doOnError(ignore -> System.out.println(String.format("------- onerror [%s] --------", Thread.currentThread().getName())));
            })
            .blockLast();
    }

}

@JsonIgnoreProperties(ignoreUnknown = true)
class Resource {
    private final Long id;
    private final String name;

    @JsonCreator
    Resource(@JsonProperty("id") Long id, @JsonProperty("name")  String name) {
        this.id = id;
        this.name = name;
    }

    Long getId() {
        return id;
    }

    String getName() {
        return name;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("Resource{");
        sb.append("id=").append(id);
        sb.append(", name='").append(name).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

And the problem is the behaviour is different than I predicted.

I expected that each call of .map(), .filter() and .flatMap() will be executed on main thread and each call of .doOnSuccess() or .doOnError will be executed on a thread from nio thread pool. So I expected logs that look like:

------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
(and so on...)
------- onsuccess [reactor-http-nio-2] --------
(and so on...)

But the logs I've got are:

------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- onsuccess [reactor-http-nio-2] --------
------- onsuccess [reactor-http-nio-6] --------
------- onsuccess [reactor-http-nio-4] --------
------- onsuccess [reactor-http-nio-8] --------
------- map [reactor-http-nio-2] --------
------- filter [reactor-http-nio-2] --------
------- flatmap [reactor-http-nio-2] --------
------- map [reactor-http-nio-2] --------

and each next log in .map(), .filter() and .flatMap() was done on thread from reactor-http-nio.

Next incomprehensible fact is the ratio between operations executed on main thread and reactor-http-nio is always different. Sometimes all operations .map(), .filter() and .flatMap() are performed on main thread.


Solution

  • Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.

    Obtaining a Flux or a Mono does not necessarily mean that it runs in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made.

    Project Reactor relevant documentation can be found here.

    From your code, the following snippet:

    webClient.post()
             .uri("/resource")
             .syncBody(res)
             .header("Content-Type", "application/json")
             .header("Accept", "application/json")
             .retrieve()
             .bodyToMono(Resource.class)
    

    Leads to a thread switch from the main to netty's worker pool. Afterward, all the following actions are performed by the netty worker thread.

    If you want to control this behavior, you should add a publishOn(...) statement to your code, for example:

    webClient.post()
             .uri("/resource")
             .syncBody(res)
             .header("Content-Type", "application/json")
             .header("Accept", "application/json")
             .retrieve()
             .bodyToMono(Resource.class)
             .publishOn(Schedulers.elastic())
    

    In this way, any following action will be performed by the elastic scheduler thread pool.

    Another example would be a usage of a dedicated scheduler for heavy tasks that following HTTP request execution.

    import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
    import static com.github.tomakehurst.wiremock.client.WireMock.get;
    import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
    
    import com.github.tomakehurst.wiremock.WireMockServer;
    import java.util.concurrent.TimeUnit;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.extension.ExtendWith;
    import org.springframework.web.reactive.function.client.ClientResponse;
    import org.springframework.web.reactive.function.client.WebClient;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    import ru.lanwen.wiremock.ext.WiremockResolver;
    import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;
    import ru.lanwen.wiremock.ext.WiremockUriResolver;
    import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;
    
    @ExtendWith({
      WiremockResolver.class,
      WiremockUriResolver.class
    })
    public class ReactiveThreadsControlTest {
    
      private static int concurrency = 1;
    
      private final WebClient webClient = WebClient.create();
    
      @Test
      public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {
    
        String requestUri = "/slow-response";
    
        server.stubFor(get(urlEqualTo(requestUri))
          .willReturn(aResponse().withStatus(200)
            .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))
        );
    
        Flux
          .generate(() -> Integer.valueOf(1), (i, sink) -> {
            System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));
            sink.next(i);
            return i + 1;
          })
          .subscribeOn(Schedulers.single())
          .flatMap(i ->
              executeGet(uri + requestUri)
                .publishOn(Schedulers.elastic())
                .map(response -> {
                  heavyTask();
                  return true;
                })
            , concurrency)
          .subscribe();
    
        blockForever();
      }
    
      private void blockForever() {
        Object monitor = new Object();
    
        synchronized (monitor) {
          try {
            monitor.wait();
          } catch (InterruptedException ex) {
          }
        }
      }
    
    
      private Mono<ClientResponse> executeGet(String path) {
        System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));
        return webClient
          .get()
          .uri(path)
          .exchange();
      }
    
      private void heavyTask() {
        try {
          System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));
          Thread.sleep(TimeUnit.SECONDS.toMillis(20));
        } catch (InterruptedException ex) {
        }
      }
    }