Is there a way of trying to wait for a CompletableFuture
a certain amount of time before giving a different result back without cancelling the future after timing out?
I have a service (let's call it expensiveService
) that runs off to do its own thing. It returns a result:
enum Result {
COMPLETED,
PROCESSING,
FAILED
}
I'm willing to [block and] wait for it for a short amount of time (let's say 2 s). If it doesn't finish, I want to return a different result, but I want the service to carry on doing its own thing. It would be the client's job to then inquire as to whether the service is finished or not (e.g. through websockets or whatever).
I.e. we have the following cases:
expensiveService.processAndGet()
takes 1 s and completes its future. It returns COMPLETED
.expensiveService.processAndGet()
fails after 1 s. It returns FAILED
.expensiveService.processAndGet()
takes 5 s and completes its future. It returns PROCESSING
. If we ask another service for the result, we get COMPLETED
.expensiveService.processAndGet()
fails after 5 s. It returns PROCESSING
. If we ask another service for the result, we get FAILED
.In this specific case, we actually need to fetch the current result object anyway on a timeout, resulting in the following additional edge-case. This causes some issues with the solutions suggested below:
expensiveService.processAndGet()
takes 2.01 s and completes its future. It returns either PROCESSING
or COMPLETED
.I'm also using Vavr and am open to suggestions using Vavr's Future
.
We have created three possible solutions which all have their own positives and negatives:
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
Thread.sleep(2000);
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
resultService
is always called.CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
int attempts = 0;
int timeout = 20;
while (!f.isDone() && attempts * timeout < 2000) {
Thread.sleep(timeout);
attempts++;
}
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
resultService
is still always called.Object.notify
Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
process.whenComplete((r, e) -> {
synchronized (monitor) {
monitor.notifyAll();
}
});
try {
int attempts = 0;
int timeout = 20;
while (!process.isDone() && attempts * timeout < 2000) {
monitor.wait(timeout);
attempts++;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (process.isDone()) {
return process.toCompletableFuture();
} else {
return CompletableFuture.completedFuture(resultService.get(processId));
}
Future.await
return Future.of(() -> expensiveService.processAndGet()
.await(2, TimeUnit.SECONDS)
.recoverWith(e -> {
if (e instanceof TimeoutException) {
return Future.successful(resultService.get(processId));
} else {
return Future.failed(e);
}
})
.toCompletableFuture();
await
cancelling the inner Future.ThreadLocal
s.recoverWith
and catching the TimeoutException
isn't that elegant.CompletableFuture.orTimeout
return expensiveService.processAndGet()
.orTimeout(2, TimeUnit.SECONDS)
.<CompletableFuture<Upload>>handle((u, e) -> {
if (u != null) {
return CompletableFuture.completedFuture(u);
} else if (e instanceof TimeoutException) {
return CompletableFuture.completedFuture(resultService.get(processId));
} else {
return CompletableFuture.failedFuture(e);
}
})
.thenCompose(Function.identity());
processAndGet
future is not cancelled, according to docs, it should be.CompletableFuture.completeOnTimeout
return expensiveService.processAndGet()
.completeOnTimeout(null, 2, TimeUnit.SECONDS)
.thenApply(u -> {
if (u == null) {
return resultService.get(processId);
} else {
return u;
}
});
processAndGet
future is not completed, according to docs, it should be.processAndGet
wanted to return null
as a different state?All of these solutions have disadvantages and require extra code but this feels like something that should be supported either by CompletableFuture
or Vavr's Future
out-of-the-box. Is there a better way to do this?
It’s worth pointing out first, how CompletableFuture
work (or why it is named like it is):
CompletableFuture<?> f = CompletableFuture.supplyAsync(supplier, executionService);
is basically equivalent to
CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
if(!f.isDone()) {
try {
f.complete(supplier.get());
}
catch(Throwable t) {
f.completeExceptionally(t);
}
}
});
There is no connection from the CompletableFuture
to the code being executed by the Executor
, in fact, we can have an arbitrary number of ongoing completion attempts. The fact that a particular code is intended to complete a CompletableFuture
instance, becomes apparent only when one of the completion methods is called.
Therefore, the CompletableFuture
can not affect the running operation in any way, this includes interrupting on cancellation or such alike. As the documentation of CompletableFuture
says:
Method cancel has the same effect as
completeExceptionally(new CancellationException())
So a cancellation is just another completion attempt, which will win if it is the first one, but not affect any other completion attempt.
So orTimeout(long timeout, TimeUnit unit)
is not much different in this regard. After the timeout elapsed, it will perform the equivalent to completeExceptionally(new TimeoutException())
, which will win if no other completion attempt was faster, which will affect dependent stages, but not other ongoing completion attempts, e.g. what expensiveService.processAndGet()
has initiated in your case.
You can implement the desired operation like
CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture<Upload> alternative = CompletableFuture.supplyAsync(
() -> resultService.get(processId), CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
return future.applyToEither(alternative, Function.identity())
.whenComplete((u,t) -> alternative.cancel(false));
With delayedExecutor
we use the same facility as orTimeout
and completeOnTimeout
. It doesn’t evaluate the specified Supplier
before the specified time or not at all when the cancellation in future.whenComplete
is faster. The applyToEither
will provide whichever result is available faster.
This doesn’t complete the future
on timeout, but as said, its completion wouldn’t affect the original computation anyway, so this would also work:
CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
.execute(() -> {
if(!future.isDone()) future.complete(resultService.get(processId));
});
return future;
this completes the future after the timeout, as said, without affecting ongoing computations, but providing the alternative result to the caller, but it wouldn’t propagate exceptions throw by resultService.get(processId)
to the returned future.