Search code examples
javamultithreadingreactive-programmingproject-reactor

What does the term "concurrency agnostic" means exactly?


I am trying to understand Java reactive library and I came through this keyword "concurrency-agnostic" here. Can someone provide a small working example to better understand this?

Does it mean that the developer has to make sure that his code will work fine and will achieve concurrency?

p.s: very new to concurrent / parallel applications


Solution

  • This is what the doc says:

    ...it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.

    Obtaining a Flux or a Mono does not necessarily mean that it runs in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed

    Let's see a couple of examples:

    1) Non concurrent/parallel execution. By default Project Reactor doesn't enforce concurrency. That's why the following Mono runs on the same thread (main) where it was subscribed:

            Logger logger = LoggerFactory.getLogger(MyApplication.class);
    
            Mono<String> helloMono = Mono.defer(() -> Mono.just("Hello!"))
                    .doOnNext(logger::info);
    
            helloMono.subscribe();
    

    Logs:

    22:06:15.235 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
    22:06:19.841 [main] INFO MyApplication - Hello!
    

    2) Parallel execution. We can make the Mono execute on a different thread, enforcing parallelism using the subscribeOn() operation. Notice how the Mono runs on the elastic-2 thread and we need to use a CountDownLatch to force the program to wait:

            Logger logger = LoggerFactory.getLogger(MyApplication.class);
    
            Mono<String> helloMono = Mono.defer(() -> Mono.just("Hello!"))
                    .doOnNext(logger::info)
                    .doOnTerminate(latch::countDown)
                    .subscribeOn(Schedulers.elastic());
    
            helloMono.subscribe();
    
            latch.await();
    

    Logs:

    22:11:26.704 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
    22:11:26.733 [elastic-2] INFO MyApplication - Hello!
    

    Frameworks such as WebFlux use an approach similar to the second scenario in which I/O operations free the calling thread (http threads). They are run in parallel using those other thread pools.