Search code examples
rx-javareactive-programmingproject-reactorspring-data-r2dbcr2dbc

Reg different threads for flux consumer


In the below two examples, the behavior of processing a flux stream seems to be different.

Example 1 :

public static void main(String[] args) throws InterruptedException {
        log.debug(" Before reading flux stream");
        Flux<Customer> results = readFluxStream();
        log.debug(Thread.currentThread().getName() + " : After reading flux stream");
        results.subscribe(new LearnFlux().new CustomerConsumer());
        Thread.sleep(5000);
        log.debug(" Exit Main Thread ");

    }

    public static Flux<Customer> readFluxStream() {
        List<Customer> customers = buildCustomers();
        Customer[] customerArray = new Customer[customers.size()];
        customerArray = customers.toArray(customerArray);
        Flux<Customer> temp = Flux.fromArray(customerArray).delayElements(Duration.ofSeconds(1)).log();
        return temp;
    }

    
    private class CustomerConsumer implements Consumer<Customer> {

        @Override
        public void accept(Customer customer) {
            log.debug(Thread.currentThread().getName() + " This is a consumer " + customer.getFirstName());

        }
    }

From the below logs we understand that the Flux consumer is running in a different thread.( highlighted within *** ). I have introduced a sleep in the main thread so that the consumer logs can be captured in the console.

19:07:24.695 [***main***] DEBUG com.learnjava.LearnFlux -  Before reading flux stream
19:07:24.759 [***main***] DEBUG reactor.util.Loggers - Using Slf4j logging framework
19:07:24.779 [***main***] DEBUG com.learnjava.LearnFlux - main : After reading flux stream
19:07:24.788 [***main***] INFO  reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
19:07:24.790 [***main***] INFO  reactor.Flux.ConcatMap.1 - request(unbounded)
19:07:25.821 [***parallel-1***] INFO  reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Tom', lastName='Cruise'])
19:07:25.835 [***parallel-1***] DEBUG com.learnjava.LearnFlux - parallel-1 This is a consumer Tom
19:07:26.841 [***parallel-2***] INFO  reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Jim', lastName='Carry'])
19:07:26.842 [***parallel-2***] DEBUG com.learnjava.LearnFlux - parallel-2 This is a consumer Jim
19:07:26.844 [***parallel-2***] INFO  reactor.Flux.ConcatMap.1 - onComplete()
19:07:29.817 [***main***] DEBUG com.learnjava.LearnFlux -  Exit Main Thread 

Example 2

public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {

    @Query("SELECT * FROM customer WHERE last_name = :lastname")
    Flux<Customer> findByLastName(String lastName);

}


public class CustomerConsumer implements Consumer<Customer> {

    private static final Logger log = LoggerFactory.getLogger(CustomerConsumer.class);
    @Override
    public void accept(Customer customer) {
        
        log.info(" This is a concusmer " + customer);
    
        
    }
}



log.info(" Invoking R2DBC flux response ");
            Flux<Customer> customers = repository.findAll();
            customers.subscribe(new CustomerConsumer());
            log.info("complete consumer in main thread");

From the below logs , we observe that the consumer is running in the same main thread. ( highlighted in *** )

[***main***] Invoking R2DBC flux response 
[***main***] This is a concusmer Customer[id=1, firstName='Jack', lastName='Bauer']
[***main***]This is a concusmer Customer[id=2, firstName='Chloe', lastName='O'Brian']
[***main***] This is a concusmer Customer[id=3, firstName='Michelle', lastName='Dessler']
[***main***] complete consumer in main thread

Clarification :

Why is the Flux consumer in the first example running in a different thread where as the Flux returned by the R2DBC based repository (second example) is processed in the same main thread?

I am trying to understand the real difference and benefits between using R2DBC and normal JDBC


Solution

  • Why is the Flux consumer in the first example running in a different thread where as the Flux returned by the R2DBC based repository (second example) is processed in the same main thread?

    The key understanding here is that any reactive operator can switch threads (or more precisely, schedulers) as it sees fit. While most operators don't switch, time based operators have to, and they'll default to using the parallel scheduler.

    In the first example, you're using the delayElements() operator. As it's a time based operator, by default, it switches to the parallel scheduler, which then runs on the parallel executor (and hence the parallel threads that you see in the logs.) Time based schedulers have to switch as the "immediate" scheduler, which would keep your operations on the same thread, isn't capable of time-based scheduling (which is required by the delayElements operator.)

    That's not to say you have to use the parallel scheduler if you have a particular reason not to - there's an overload that lets you set it to whatever you like. If you use .delayElements(Duration.ofSeconds(1), Schedulers.boundedElastic()) for example, you'll see your logs will show the bounded elastic threadpool being used.

    On the contrary, in your second R2DBC example, there's no operator switching it away from the immediate scheduler. It will just run on the main thread, as you see from the logs.

    If you want a deeper understanding of how threading works in reactor, Simon's flight of the Flux talk is well worth a watch: https://m.youtube.com/watch?v=sNgTTcG-fEU - there's also some accompanying blog posts.