I recently explored project reactor library and trying to use it for my use case where I have a list of tasks and some tasks are dependent on the execution of other tasks and some tasks can execute in parallel for the sake of performance. The execution order looks in form of directed acyclic graph. Below is the POC code for that:
public class ReactorPOC {
public static void main(String args[]) {
//First time executing mono is taking long time
run();
//All subsequent executions not excess time
run();
run();
}
public static void run() {
try {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Long st = System.currentTimeMillis();
Publisher one = getTask(60, "one", executorService, st).cache();
Publisher two = getTask(60, "two", executorService, st).cache();
Publisher three = getTask(60, "three", executorService, st).cache();
Publisher four = getTask(60, "four", executorService, st).cache();
Publisher eight = getTask(60, "eight", executorService, st).cache();
Publisher five = getTask(60, "five", executorService, st).cache();
Publisher six = getTask(60, "six", executorService, st).cache();
Publisher seven = getTask(60, "seven", executorService, st).cache();
three = Flux.concat(Flux.merge(one, two), three);
five = Flux.concat(Flux.merge(three, four, eight), five);
six = Flux.concat(five, six);
seven = Flux.concat(five, seven);
Flux last = Flux.merge(one, two, three, four, five, six, seven, eight);
last.blockLast();
System.out.println(System.currentTimeMillis() - st);
} catch (Exception e) {
System.out.println(e);
}
}
static Mono getTask(int sleep, String task, ExecutorService executorService, long st) {
return Mono.just(task).doOnSubscribe( i -> {
System.out.println("Starting " + task + " at " + (System.currentTimeMillis() - st));
try {
Thread.sleep(sleep);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Ending " + task + " at " + (System.currentTimeMillis() - st));
}).subscribeOn(Schedulers.fromExecutor(executorService));
}
}
This is working as expected in terms of order of execution. But I have 2 doubts:
I have executed the graphical execution 3 times (calling 3 times from main function). The first time, it is taking around 1200 millisecs which is too long and for all the next executions, its taking around 250 millisecs which is the expected time it should take. I am trying to understand why it is taking so long the first time.
If any of the tasks breaks, I want to have a way to throw exception and not proceed further in the execution order. Is there any way to do that? I have a way I thought of where I will keep a shared object and will set a field with error and all the next tasks will look at this field first and then decide not to execute that task. I wanted to check if there is a better way of doing this.
Please help in clarifying the above two queries.
Also, I am new to this library and the whole reactive paradigm. So, if there are any inputs/suggestions on the above code, it would be great.
Thanks :)
as far as I can see from running with AsyncProfiler and looking at a flame graph, it seems to be purely a factor of class loading.
the code could be improved, notably to remove caching and to avoid blocking inside the doOnSubscribe
(which is a code smell), but it doesn't change the deal very much.
to better demonstrate, I've copy-pasted the run()
method as run2()
and have the main
execute run1()
then run2()
(which are the exact same code). we can then observe that run1()
always take more time, and the difference lies in class loading:
if we launch run2()
first, it becomes the one prominent in the flame graph with classloading delays: