Search code examples
reactive-programmingspring-webfluxreactive-streams

subscribeOn(Schedulers.parallel()) is not working


I am learning reactor core and following this https://www.baeldung.com/reactor-core

ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(arrList::add);

System.out.println("After: " + arrList);

when I execute above line of code, gives out.

 Before: []
 [DEBUG] (main) Using Console logging
 After: []

Above lines of code should start execution in another thread but it is not working at all. Can somebody help me on this ?


Solution

  • I think there is some confusion. When you call subscribeOn(Schedulers.parallel()). You specify that you want to receive items on the different thread. Also you have to slow down your code so the subscribe cen actually kick in (that is why I added Thread.sleep(100)). If you run the code that i have passed it works. You see there is no magic synchronization mechanism in reactor.

        ArrayList<Integer> arrList = new ArrayList<Integer>();
    
        Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> i * 2)
                .subscribeOn(Schedulers.parallel())
                .subscribe(
                        t -> {
                            System.out.println(t + " thread id: " + Thread.currentThread().getId());
                            arrList.add(t);
                        }
                );
        System.out.println("size of arrList(before the wait): " + arrList.size());
        System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
        Thread.sleep(100);
        System.out.println("size of arrList(after the wait): " + arrList.size());
    

    If you want to add your items to the list in parallel reactor is not a good choice. Better to use parallel streams in java 8.

    List<Integer> collect = Stream.of(1, 2, 3, 4)
                    .parallel()
                    .map(i -> i * 2)
                    .collect(Collectors.toList());
    

    That tutorial you posted is not very precise when it comes to concurrency part. To the author credit he/she says that more articles is to come. But I don't think that should post that particular example at all as it creates confusion. I suggest not trusting resources on the internet that much :)