Search code examples
javamultithreadingasynchronouscompletable-futureconcurrent.futures

Multi threading in Java, track success & failure tasks


I am trying to run a async method in scheduled for every 5 mins in Spring to process 1000 of tasks using 100 threads.At end of every run I need to figure out how many task's failed & succeeded. I tried using Completable future using below sample code but I am facing 2 main issue.

  1. If some exceptions comes schedular restarts without completing run.
  2. How to get success/failure task number after run.I would like to print at the end success tasks:[1,2,4,5] failed tasks : [9,10,7,8]
//ScheduledTask
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
for(int I=0;i<300;i++){
 futures.add(service.performTask(i));
 }
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}

//MyAsyncService

@Async

public CompletableFuture<Integer> performTask(int i){
try{
Thread.sleep(1000);
final Thread currentThread=Thread.currentThread();
final String oldName = currentThread.getName();
**currentThread.setName(oldName+"-"+i);**
int test=(int) (i+10)/RandomNumber(0,10); // generate random number between 0 to 10 and divide I+10 by that to fail some tasks randomly.
return CompletableFuture.completeFuture(i);
}catch(Exception e){
CompletableFuture<Integer> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}

}

//MyAsyncConfig

public Executor getAsyncExecutor() {
        final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();

        threadPoolTaskExecutor.setThreadNamePrefix("async-thread-");
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(300);

        threadPoolTaskExecutor.initialize();

        return threadPoolTaskExecutor;
    }

Solution

  • Hmmm, I think we still need the join to wait the completion of the whole set of CompletableFutures for the scheduled task :)

    Another thing, to make sure the scheduled task finished, we need to use exceptionally() or handle() to catch the exception.

        public void processTask() {
            List<CompletableFuture<Integer>> futures=new ArrayList<>();
            List<Integer> successes = new ArrayList<>();
            List<Integer> failures = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                int record = i;
                CompletableFuture<Integer> integerCompletableFuture = performTask(i).handle(((integer, throwable) -> {
                    if (throwable != null) {
                        failures.add(record);
                    } else {
                        successes.add(record);
                    }
                    return integer;
                }));
    
                futures.add(integerCompletableFuture);
            }
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            log.info("successes: {}", successes);
            log.info("failures: {}", failures);
        }
    

    This will print something like this:

    [main] INFO rxjava.CompletableFutureConcurrency - successes: [0, 1, 3, 4, 5, 6, 7, 9]
    [main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
    

    or something like this if you don't need success records.

    public void processTask(){
            List<CompletableFuture<Integer>> futures=new ArrayList<>();
            List<Integer> failures = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                int record = i;
                CompletableFuture<Integer> integerCompletableFuture = performTask(i)
                        .exceptionally(throwable -> {
                             failures.add(record);
                             return record;
                        });
                futures.add(integerCompletableFuture);
            }
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            log.info("failures: {}", failures);
    

    This will print something like this:

    [main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
    

    PS: Here I used the int record = i. You can play without it know why I did so ;)

    Updated on 26.03.2022

    The async execution of the performTask is handled by @Async annotations. As asked by @user3134221, if we want to use our own executors, we could use supplyAsync of CompletableFuture.

    CompletableFuture.supplyAsync

    I made a demo here (sorry, it's done in a rush, not perfect :/ ), you could check Example of how to run performTask in different threads