Search code examples
javajava-8executorservicecompletable-future

Java 8 mulithreading: How can I achieve parallelism along with a timeout for individual threads?


Summary of what I want to achieve:

I want to execute N tasks in parallel such that no individual task should run for more than two seconds (we can mark such tasks as failed). As an output I want to return the output of successful tasks and status of failed tasks as failed. Also a timeout of one task should not lead to a circuit break, i.e., other tasks execution should not stop.


Note: I am restricted to use Java 8.

I referenced this article for parallel processing. I am doing a similar kind of parallel processing as given in the example in this article:

public void parallelProcessing() {
    try {
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        List<CompletableFuture<Integer>> futuresList = new ArrayList<CompletableFuture<Integer>>();
        futuresList.add(CompletableFuture.supplyAsync(()->(addFun1(10, 5)), executorService));
        futuresList.add(CompletableFuture.supplyAsync(()->(subFun1(10, 5)), executorService));
        futuresList.add(CompletableFuture.supplyAsync(()->(mulFun1(10, 5)), executorService));

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
        CompletableFuture<List<Integer>> allCompletableFuture = allFutures.thenApply(future -> futuresList.stream().map(completableFuture -> completableFuture.join())
                .collect(Collectors.toList()));
        CompletableFuture<List<Integer>> completableFuture = allCompletableFuture.toCompletableFuture();
        List<Integer> finalList = (List<Integer>) completableFuture.get();
    } catch (Exception ex) {

    }
}


public static Integer addFun1(int a, int b) {
    System.out.println(Thread.currentThread().getName());

    for (int i = 0; i < 10; i++) {

        System.out.print(Thread.currentThread().getName() + i);
    }

    return a + b;
}

public static Integer subFun1(int a, int b) {

    System.out.println(Thread.currentThread().getName());

    for (int i = 0; i < 10; i++) {

        System.out.print(Thread.currentThread().getName() + i);
    }

    return a - b;
}


public static Integer mulFun1(int a, int b) {

    System.out.println(Thread.currentThread().getName());

    for (int i = 0; i < 10; i++) {

        System.out.print(Thread.currentThread().getName() + i);
    }

    return a * b;
}

This works fine. But I want to set a timeout for an individual thread. I know I can use an overloaded get function in the last line. But that would set the timeout for combined futures, right? E.g., if I want no individual thread should be blocked for more than 2 seconds, and if I set a 2 seconds timeout in the last line, it will be combined timeout, right?

get(long timeout, TimeUnit unit)

Here's what I want to achieve as a final outcome:

Suppose there are five threads and four complete on time, one timeout (due to running more than two seconds). In this case, I want to send the output of four threads and send the error for the fifth thread in the result.

My input/output format is in the following way:

Sample input: List<Input> each item is run in a separate thread, where each input has a uniqueIdentifier.

Sample output: List<Output> such that:

Output :{
    uniqueIdentifier: // Same as input to map for which input this output was generated
    result: success/fail // This Field I want to add. Currently it's not there
    data: {
        // From output, e.g., addFun1 and subFun1
    }
}

Solution

  • The semantics of what you want to achieve matter very much. On one hand, you say that you want an alternative for orTimeout for Java 8; on the other hand you kind of imply that you want to drop execution of a certain CompletableFuture if it goes beyond a certain threshold.

    These are very different things, because orTimeout says in the documentation:

    Exceptionally completes this CompletableFuture with a TimeoutException if not otherwise completed before the given timeout.

    So something like:

    CompletableFuture<Integer> addAsy =
        supplyAsync(() -> addFun1(10,5), executorService)
        .orTimeout(5, TimeUnit.MILLISECONDS);
    

    will result in a CompletableFuture that is completed exceptionally (assuming that addFun1 takes more than 5 ms). At the same time, this:

    CompletableFuture<Void> allFutures = CompletableFuture
            .allOf(futuresList.toArray(new CompletableFuture[0]));
    

    as the documentation states in the allOf:

    ... If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so...

    means that allFutures is a CompletableFuture that is completed exceptionally too (because addAsy is).

    Now, because you have this:

        CompletableFuture<List<Integer>> allCompletableFuture = allFutures.thenApply(future -> {
            return futuresList.stream().map(CompletableFuture::join)
                .collect(Collectors.toList());
        });
    

    And again, the documentation of thenApply says:

    Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function.

    Your allFutures did not complete normally, as such this is not even called.

    So you need to understand what exactly you want to achieve. For a backport of orTimeout you could start by looking here.


    You still need some kind of a backport for orTimeout. I will use the method as if it already exists.

    static void parallelProcessing() throws Exception {
    
        ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        List<CompletableFuture<Integer>> futuresList = new ArrayList<>();
        futuresList.add(CompletableFuture.supplyAsync(() -> addFun1(10,5), executorService).orTimeout(2, TimeUnit.SECONDS));
        futuresList.add(CompletableFuture.supplyAsync(() -> subFun1(10,5), executorService));
        futuresList.add(CompletableFuture.supplyAsync(() -> mulFun1(10,5), executorService));
    
    
        CompletableFuture<Void> all = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
        Map<Boolean, List<CompletableFuture<Integer>>> map =
                all.thenApply(x -> both(futuresList)).exceptionally(x -> both(futuresList)).get();
    
        List<CompletableFuture<Integer>> failed = map.get(Boolean.TRUE);
        List<CompletableFuture<Integer>> ok = map.get(Boolean.FALSE);
    
        System.out.println("failed = " + failed.size());
        System.out.println("ok = " + ok.size());
    
    }
    
    private static Map<Boolean, List<CompletableFuture<Integer>>> both(
            List<CompletableFuture<Integer>> futuresList) {
        return futuresList.stream().collect(Collectors.partitioningBy(
                        CompletableFuture::isCompletedExceptionally
                ));
    }