I am learning reactor core and following this https://www.baeldung.com/reactor-core
ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(arrList::add);
System.out.println("After: " + arrList);
when I execute above line of code, gives out.
Before: []
[DEBUG] (main) Using Console logging
After: []
Above lines of code should start execution in another thread but it is not working at all. Can somebody help me on this ?
I think there is some confusion. When you call subscribeOn(Schedulers.parallel())
. You specify that you want to receive items on the different thread. Also you have to slow down your code so the subscribe cen actually kick in (that is why I added Thread.sleep(100)
). If you run the code that i have passed it works. You see there is no magic synchronization mechanism in reactor.
ArrayList<Integer> arrList = new ArrayList<Integer>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(
t -> {
System.out.println(t + " thread id: " + Thread.currentThread().getId());
arrList.add(t);
}
);
System.out.println("size of arrList(before the wait): " + arrList.size());
System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
Thread.sleep(100);
System.out.println("size of arrList(after the wait): " + arrList.size());
If you want to add your items to the list in parallel reactor is not a good choice. Better to use parallel streams in java 8.
List<Integer> collect = Stream.of(1, 2, 3, 4)
.parallel()
.map(i -> i * 2)
.collect(Collectors.toList());
That tutorial you posted is not very precise when it comes to concurrency part. To the author credit he/she says that more articles is to come. But I don't think that should post that particular example at all as it creates confusion. I suggest not trusting resources on the internet that much :)