Search code examples
javaasynchronoustimeoutfuture

Unexpected Behavior for CompletableFuture.allOf().orTimeout()


There are 2 ways to to force a CompletableFuture to timeout after a given amount of time:

I would expect them to behave the same. However, when applied to CompletableFuture.allOf(CompletableFuture<?>... cfs), these 2 ways of providing a timeout behave quite differently !

Basically, it seems that get() does what I would expect (it blocks the current thread until all futures have completed), whereas orTimeout() seems to behave very strangely (it unblocks the current thread as soon as it can after the first future has completed).

Here's some code to demo the behavior I am observing:

import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class AllOfWithTimeoutTest {

    public static final int TIMEOUT_IN_MILLIS = 100;

    @Test
    public void allOfOrTimeout1() throws InterruptedException, ExecutionException, TimeoutException {
        getAllOfFuture().get(TIMEOUT_IN_MILLIS, MILLISECONDS);
    }

    @Test
    public void allOfOrTimeout2() throws ExecutionException, InterruptedException {
        getAllOfFuture().orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);
    }

    private CompletableFuture<Void> getAllOfFuture() {
        return CompletableFuture.allOf(
            CompletableFuture.runAsync(() -> sleep(1)),
            CompletableFuture.runAsync(() -> sleep(2)),
            CompletableFuture.runAsync(() -> sleep(3)),
            CompletableFuture.runAsync(() -> sleep(4)),
            CompletableFuture.runAsync(() -> sleep(5)),
            CompletableFuture.runAsync(() -> sleep(6)),
            CompletableFuture.runAsync(() -> sleep(7)),
            CompletableFuture.runAsync(() -> sleep(8))
        );
    }

    public static void sleep(int millis) {
        try {
            Thread.sleep(millis);
            System.out.format("Had a nap for %s milliseconds.\r\n", millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };
}

The printout of allOfOrTimeout1() is what I would expect:

Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Had a nap for 5 milliseconds.
Had a nap for 6 milliseconds.
Had a nap for 7 milliseconds.
Had a nap for 8 milliseconds.

The printout of allOfOrTimeout2() is NOT what I would expect, and varies slightly at every execution. It typically prints out between the first 2 and 5 lines, but never 8. Here's a version where it printed out only 2 lines:

Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.

Also, if I run the whole test in IntelliJ, I get some extra lines at the end:

Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Had a nap for 5 milliseconds.
Had a nap for 6 milliseconds.
Had a nap for 7 milliseconds.
Had a nap for 8 milliseconds.



Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.

Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.

Process finished with exit code 0
Had a nap for 

My questions are:

  1. Is this the expected behavior for orTimeout() ?
  2. If not, why is it doing this ?

Solution

  • I guess if you write it like this:

    public static void allOfOrTimeout2() {
        CompletableFuture<Void> future1 = getAllOfFutures();
        CompletableFuture<Void> future2 = future1.orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);
    }
    

    and put yourself in the shoes of the main thread and think how it is supposed to execute allOfOrTimeout2(), things will be easier.

    main thread needs to execute allOfOrTimeout2. It goes into this method and returns from there a CompletableFuture<Void>. Does it wait for it to complete? Nope. Even the documentation says that, if you want to wait for this CompletableFuture to finish, you need to:

    Among the applications of this method is to await completion of a set of independent CompletableFutures before continuing a program, as in: CompletableFuture.allOf(c1, c2,c3).join();

    But you do not call that join, you return it from the method and move on.

    The execution of that CompletableFuture though, does start. It executes in threads from ForkJoinPool, via those runAsync (this is how supplyAsync for example works too: you dispatch to a different thread the actual work, and the thread that invoked supplyAsync moves on).

    main thread then moves on to this:

    CompletableFuture<Void> future2 = future1.orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);
    

    It needs to call orTimeout and the documentation says:

    Exceptionally completes this CompletableFuture with TimeoutException if not otherwise completed before the given timeout.

    Is the future1 completed? This is again a timing issue, but most probably no (for it to be completed all those runAsync need to finish). So there is no timeout and the invocation of allOfOrTimeout2 is over.

    Now those threads that run the code in runAsync are called daemon threads, a VM does not have to wait for them to be over in order to shut down. So main thread is over, those threads are not stopping the VM, so everything ends.

    Since the entire output depends on how threads are getting scheduled, it is impossible to predict it. You can't even predict if future2 finishes before VM shuts down.


    Of course if you want predictable result: you either wait for future2 to be done or pass an executor to runAsync.