I am trying to run a async method in scheduled for every 5 mins in Spring to process 1000 of tasks using 100 threads.At end of every run I need to figure out how many task's failed & succeeded. I tried using Completable future using below sample code but I am facing 2 main issue.
//ScheduledTask
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
for(int I=0;i<300;i++){
futures.add(service.performTask(i));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
//MyAsyncService
@Async
public CompletableFuture<Integer> performTask(int i){
try{
Thread.sleep(1000);
final Thread currentThread=Thread.currentThread();
final String oldName = currentThread.getName();
**currentThread.setName(oldName+"-"+i);**
int test=(int) (i+10)/RandomNumber(0,10); // generate random number between 0 to 10 and divide I+10 by that to fail some tasks randomly.
return CompletableFuture.completeFuture(i);
}catch(Exception e){
CompletableFuture<Integer> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}
}
//MyAsyncConfig
public Executor getAsyncExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("async-thread-");
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(300);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
Hmmm, I think we still need the join to wait the completion of the whole set of CompletableFutures for the scheduled task :)
Another thing, to make sure the scheduled task finished, we need to use exceptionally() or handle() to catch the exception.
public void processTask() {
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> successes = new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i).handle(((integer, throwable) -> {
if (throwable != null) {
failures.add(record);
} else {
successes.add(record);
}
return integer;
}));
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("successes: {}", successes);
log.info("failures: {}", failures);
}
This will print something like this:
[main] INFO rxjava.CompletableFutureConcurrency - successes: [0, 1, 3, 4, 5, 6, 7, 9]
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
or something like this if you don't need success records.
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i)
.exceptionally(throwable -> {
failures.add(record);
return record;
});
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("failures: {}", failures);
This will print something like this:
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
PS: Here I used the int record = i
. You can play without it know why I did so ;)
Updated on 26.03.2022
The async execution of the performTask is handled by @Async
annotations. As asked by @user3134221, if we want to use our own executors, we could use supplyAsync
of CompletableFuture
.
I made a demo here (sorry, it's done in a rush, not perfect :/ ), you could check Example of how to run performTask in different threads