Search code examples
javareactor

Why this flux isn't executed in multiple threads?


I have the following example:

Flux.just(1,2,3,4,5,6,7,8)
    .flatMap(integer -> {
                 System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId()); 
                 return Mono.just(integer);
             }, 5)
    .repeat()
    .subscribeOn(Schedulers.parallel())
    .subscribe();

logs are as follows:

val:4, thread:14
val:5, thread:14
val:6, thread:14
val:7, thread:14
val:8, thread:14
val:1, thread:14
val:2, thread:14
val:3, thread:14

why the same thread everywhere?? How can i modify the example so it is executed in multiple threads?


Solution

  • If you want each repeated flux to be on a different thread, you can move the publishOn before, like this:

    Flux.just(1,2,3,4,5,6,7,8)
            .publishOn(Schedulers.parallel()) // <- before
            .flatMap(integer -> {
               System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId()); 
               return Mono.just(integer);
            }, 5)
            .repeat()
            .subscribe();
    

    The output is like that now:

    val:1, thread:20
    val:2, thread:20
    val:3, thread:20
    val:4, thread:20
    val:5, thread:20
    val:6, thread:20
    val:7, thread:20
    val:8, thread:20
    val:1, thread:13
    val:2, thread:13
    val:3, thread:13
    val:4, thread:13
    val:5, thread:13
    val:6, thread:13
    val:7, thread:13
    val:8, thread:13
    

    If you want each integer to be in a different thread, you can do something like this:

    Flux.just(1,2,3,4,5,6,7,8)
            .publishOn(Schedulers.parallel()) // <- Each flux can be published in a different thread
            .flatMap(integer -> {
                return Mono.fromCallable(() -> {
                     System.out.println("val:" + integer + ", thread:" + Thread.currentThread().getId()); 
                     return integer;
                }).publishOn(Schedulers.parallel()); // <- Each Mono processing every integer can be processed in a different thread
             })
            .repeat()
            .subscribe();
    

    The output becomes:

    val:3, thread:16
    val:2, thread:15
    val:7, thread:20
    val:8, thread:13
    val:5, thread:18
    val:6, thread:19
    val:3, thread:17
    val:5, thread:19
    val:6, thread:20
    val:1, thread:15
    val:8, thread:14
    val:4, thread:18
    val:7, thread:13