I'd like to subclass CompletableFuture to override the default Executor
. That is, if a user invokes a method without specifying an Executor
, I want my own Executor
to get used instead of the one normally used by CompletableFuture
.
The Javadoc hints at the possibility of subclassing:
All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.
How am I supposed to implement static methods like CompletableFuture.supplyAsync()
in the subclass if the underlying implementation depends on methods like internalComplete()
which is package-private?
How is one supposed to subclass CompletableFuture?
My user code needs to execute multiple tasks asynchronously using the same executor. For example: CompletableFuture.supplyAsync(..., executor).thenApplyAsync(..., executor).thenApplyAsync(..., executor)
. I'd like the custom CompletableFuture
implementation to use the first executor throughout all follow-up calls.
Starting with Java 9, there is built-in support for this task:
public class MyCompletableFuture<T> extends CompletableFuture<T> {
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> s, Executor e) {
Objects.requireNonNull(s);
Objects.requireNonNull(e);
return new MyCompletableFuture<T>(e).completeAsync(s);
}
private final Executor executor;
public MyCompletableFuture(Executor executor) {
this.executor = executor;
}
@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new MyCompletableFuture<>(executor);
}
@Override
public Executor defaultExecutor() {
return executor;
}
}
By overriding newIncompleteFuture()
, we can ensure that all then…
and when…
, etc. methods will return instances of MyCompletableFuture
, without overriding each of them.
The defaultExecutor()
method specifies the executor to be used by all async methods without Executor
parameter.
Conveniently, the non-static
method completeAsync
allows us to get the supplyAsync
behavior using an instance of our future implementation. The example above provides a static
method supplyAsync
which can be used similarly to the original one, i.e. MyCompletableFuture.supplyAsync(supplier, executor)
In Java 8, you have to override all methods whose behavior you want to alter, using the decoration pattern, which still doesn’t need to touch any of the CompletableFuture
’s inner workings.
import java.util.concurrent.*;
import java.util.function.*;
public class MyCompletableFuture<T> extends CompletableFuture<T> {
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> s, Executor e){
return my(CompletableFuture.supplyAsync(s, e), e);
}
private static <T> CompletableFuture<T> my(CompletableFuture<T> f,Executor e){
MyCompletableFuture<T> my=new MyCompletableFuture<>(f, e);
f.whenComplete((v,t)-> {
if(t!=null) my.completeExceptionally(t); else my.complete(v);
});
return my;
}
private final CompletableFuture<T> baseFuture;
private final Executor executor;
MyCompletableFuture(CompletableFuture<T> base, Executor e) {
baseFuture=base;
executor=e;
}
private <T> CompletableFuture<T> my(CompletableFuture<T> base) {
return my(base, executor);
}
@Override
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return my(baseFuture.acceptEitherAsync(other, action, executor));
}
@Override
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return my(baseFuture.applyToEitherAsync(other, fn, executor));
}
@Override
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return my(baseFuture.handleAsync(fn, executor));
}
@Override
public CompletableFuture<Void> runAfterBothAsync(
CompletionStage<?> other, Runnable action) {
return my(baseFuture.runAfterBothAsync(other, action, executor));
}
@Override
public CompletableFuture<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable action) {
return my(baseFuture.runAfterEitherAsync(other, action, executor));
}
@Override
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return my(baseFuture.thenAcceptAsync(action, executor));
}
@Override
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return my(baseFuture.thenAcceptBothAsync(other, action, executor));
}
@Override
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T, ? extends U> fn) {
return my(baseFuture.thenApplyAsync(fn, executor));
}
@Override
public <U, V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
return my(baseFuture.thenCombineAsync(other, fn, executor));
}
@Override
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return my(baseFuture.thenComposeAsync(fn, executor));
}
@Override
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return my(baseFuture.thenRunAsync(action, executor));
}
@Override
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return my(baseFuture.whenCompleteAsync(action, executor));
}
}
Here is a simple test case which shows that it works as expected:
ScheduledExecutorService ses=Executors.newSingleThreadScheduledExecutor();
Executor e=r -> {
System.out.println("adding delay");
ses.schedule(r, 2, TimeUnit.SECONDS);
};
MyCompletableFuture.supplyAsync(()->"initial value", e)
.thenApplyAsync(String::hashCode)
.thenApplyAsync(Integer::toOctalString)
.thenAcceptAsync(System.out::println);