Search code examples
javacompletable-futurethread-sleepvavr

Try waiting for a CompletableFuture


Is there a way of trying to wait for a CompletableFuture a certain amount of time before giving a different result back without cancelling the future after timing out?

I have a service (let's call it expensiveService) that runs off to do its own thing. It returns a result:

enum Result {
    COMPLETED,
    PROCESSING,
    FAILED
}

I'm willing to [block and] wait for it for a short amount of time (let's say 2 s). If it doesn't finish, I want to return a different result, but I want the service to carry on doing its own thing. It would be the client's job to then inquire as to whether the service is finished or not (e.g. through websockets or whatever).

I.e. we have the following cases:

  • expensiveService.processAndGet() takes 1 s and completes its future. It returns COMPLETED.
  • expensiveService.processAndGet() fails after 1 s. It returns FAILED.
  • expensiveService.processAndGet() takes 5 s and completes its future. It returns PROCESSING. If we ask another service for the result, we get COMPLETED.
  • expensiveService.processAndGet() fails after 5 s. It returns PROCESSING. If we ask another service for the result, we get FAILED.

In this specific case, we actually need to fetch the current result object anyway on a timeout, resulting in the following additional edge-case. This causes some issues with the solutions suggested below:

  • expensiveService.processAndGet() takes 2.01 s and completes its future. It returns either PROCESSING or COMPLETED.

I'm also using Vavr and am open to suggestions using Vavr's Future.

We have created three possible solutions which all have their own positives and negatives:

#1 Wait for another Future

CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            Thread.sleep(2000);
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());

Problems

  1. The second resultService is always called.
  2. We take up the entire Thread for 2 s.

#1a Wait for another Future that checks the first Future

CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            int attempts = 0;
            int timeout = 20;
            while (!f.isDone() && attempts * timeout < 2000) {
                Thread.sleep(timeout);
                attempts++;
            }
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());

Problems

  1. The second resultService is still always called.
  2. We need to pass the first Future to the second, which isn't so clean.

#2 Object.notify

Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
    process.whenComplete((r, e) -> {
        synchronized (monitor) {
            monitor.notifyAll();
        }
    });
    try {
        int attempts = 0;
        int timeout = 20;
        while (!process.isDone() && attempts * timeout < 2000) {
            monitor.wait(timeout);
            attempts++;
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
if (process.isDone()) {
    return process.toCompletableFuture();
} else {
    return CompletableFuture.completedFuture(resultService.get(processId));
}

Problems

  1. Complex code (potential for bugs, not as readable).

#3 Vavr's Future.await

return Future.of(() -> expensiveService.processAndGet()
        .await(2, TimeUnit.SECONDS)
        .recoverWith(e -> {
            if (e instanceof TimeoutException) {
                return Future.successful(resultService.get(processId));
            } else {
                return Future.failed(e);
            }
        })
        .toCompletableFuture();

Problems

  1. Needs a Future in a Future to avoid await cancelling the inner Future.
  2. Moving the first Future into a second breaks [legacy] code that relies on ThreadLocals.
  3. recoverWith and catching the TimeoutException isn't that elegant.

#4 CompletableFuture.orTimeout

return expensiveService.processAndGet()
        .orTimeout(2, TimeUnit.SECONDS)
        .<CompletableFuture<Upload>>handle((u, e) -> {
            if (u != null) {
                return CompletableFuture.completedFuture(u);
            } else if (e instanceof TimeoutException) {
                return CompletableFuture.completedFuture(resultService.get(processId));
            } else {
                return CompletableFuture.failedFuture(e);
            }
        })
        .thenCompose(Function.identity());

Problems

  1. Although in my case the processAndGet future is not cancelled, according to docs, it should be.
  2. The exception handling is not nice.

#5 CompletableFuture.completeOnTimeout

return expensiveService.processAndGet()
        .completeOnTimeout(null, 2, TimeUnit.SECONDS)
        .thenApply(u -> {
            if (u == null) {
                return resultService.get(processId);
            } else {
                return u;
            }
        });

Problems

  1. Although in my case the processAndGet future is not completed, according to docs, it should be.
  2. What if processAndGet wanted to return null as a different state?

All of these solutions have disadvantages and require extra code but this feels like something that should be supported either by CompletableFuture or Vavr's Future out-of-the-box. Is there a better way to do this?


Solution

  • It’s worth pointing out first, how CompletableFuture work (or why it is named like it is):

    CompletableFuture<?> f = CompletableFuture.supplyAsync(supplier, executionService);
    

    is basically equivalent to

    CompletableFuture<?> f = new CompletableFuture<>();
    executionService.execute(() -> {
        if(!f.isDone()) {
            try {
                f.complete(supplier.get());
            }
            catch(Throwable t) {
                f.completeExceptionally(t);
            }
        }
    });
    

    There is no connection from the CompletableFuture to the code being executed by the Executor, in fact, we can have an arbitrary number of ongoing completion attempts. The fact that a particular code is intended to complete a CompletableFuture instance, becomes apparent only when one of the completion methods is called.

    Therefore, the CompletableFuture can not affect the running operation in any way, this includes interrupting on cancellation or such alike. As the documentation of CompletableFuture says:

    Method cancel has the same effect as completeExceptionally(new CancellationException())

    So a cancellation is just another completion attempt, which will win if it is the first one, but not affect any other completion attempt.

    So orTimeout(long timeout, TimeUnit unit) is not much different in this regard. After the timeout elapsed, it will perform the equivalent to completeExceptionally(new TimeoutException()), which will win if no other completion attempt was faster, which will affect dependent stages, but not other ongoing completion attempts, e.g. what expensiveService.processAndGet() has initiated in your case.

    You can implement the desired operation like

    CompletableFuture<Upload> future = expensiveService.processAndGet();
    CompletableFuture<Upload> alternative = CompletableFuture.supplyAsync(
        () -> resultService.get(processId), CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
    return future.applyToEither(alternative, Function.identity())
        .whenComplete((u,t) -> alternative.cancel(false));
    

    With delayedExecutor we use the same facility as orTimeout and completeOnTimeout. It doesn’t evaluate the specified Supplier before the specified time or not at all when the cancellation in future.whenComplete is faster. The applyToEither will provide whichever result is available faster.

    This doesn’t complete the future on timeout, but as said, its completion wouldn’t affect the original computation anyway, so this would also work:

    CompletableFuture<Upload> future = expensiveService.processAndGet();
    CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
        .execute(() -> {
            if(!future.isDone()) future.complete(resultService.get(processId));
        });
    return future;
    

    this completes the future after the timeout, as said, without affecting ongoing computations, but providing the alternative result to the caller, but it wouldn’t propagate exceptions throw by resultService.get(processId) to the returned future.