Search code examples
javamultithreadingfuturereactorcompletable-future

Java CompletableFuture.complete() block


I have a problem when using CompletableFuture in java. I have 2 select requests those are filled when receiving responses from server.

In the connection thread (THREAD-1) (use reactor), I use:

if(hasException) {
   selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
   System.out.println("Before complete future");
   selectFuture.complete(result);
   System.out.println("After complete future");
}

And in other thread (THREAD-2), I use:

CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
   System.out.println("Receive all future");
   // Do sth here
});

My situation is that the system print out "Receive all future" but THREAD-1 is blocked when calling future.complete(result); It cannot get out of that command. If in THREAD-2, I use CompletableFuture.allOf(allOfSelect).get(), the THREAD-1 will run correctly. But using CompletableFuture.get() reduces performance, so I would like to use CompletableFuture.whenComplete().

Anyone can help me explain the cause of blocking?

Thanks!


Solution

  • The complete call triggers all dependent CompletionStages.

    So if you've previously registered a BiConsumer with whenComplete, the complete will invoke it in its calling thread. In your case, the call to complete will return when the BiConsumer you've passed to whenComplete finishes. This is described in the the class javadoc

    Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

    (by another caller is the opposite situation, where the thread calling whenComplete would actually apply the BiConsumer if the target CompletableFuture had already been completed.)

    Here's a small program to illustrate the behavior:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = new CompletableFuture<String>();
        future.whenComplete((r, t) -> {
            System.out.println("before sleep, executed in thread " + Thread.currentThread());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("after sleep, executed in thread " + Thread.currentThread());
        });
    
        System.out.println(Thread.currentThread());
        future.complete("completed");
        System.out.println("done");
    }
    

    This will print

    Thread[main,5,main]
    before sleep, executed in thread Thread[main,5,main]
    after sleep, executed in thread Thread[main,5,main]
    done
    

    showing that the BiConsumer was applied in the main thread, the one that called complete.

    You can use whenCompleteAsync to force execution of the BiConsumer in a separate thread.

    [...] that executes the given action using this stage's default asynchronous execution facility when this stage completes.

    For example,

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = new CompletableFuture<String>();
        CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
            System.out.println("before sleep, executed in thread " + Thread.currentThread());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("after sleep, executed in thread " + Thread.currentThread());
        });
    
        System.out.println(Thread.currentThread());
        future.complete("completed");
        System.out.println("done");
        done.get();
    }
    

    will print

    Thread[main,5,main]
    done
    before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
    after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
    

    showing that the BiConsumer was applied in a separate thread.