given a repository class, which returns a single:
Single<SomeObject> find()
and a CompletableFuture which returns a Float:
CompletableFuture<Long> completableFuture
I would like to invoke the repository method first and based on the result, I need to invoke the completableFuture. This is the code I have:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return Single.fromFuture(completableFuture);
}).subscribe(System.out::println)
Problem here is that Single.fromFuture
will block and therefor can not be used.
In order to work around this I tried things like:
repository.find()
.map(s -> {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
return Flowable.fromFuture(completableFuture);
}).subscribe(System.out::println)
while this works fine without blocking, the subscribe function prints the following instead of the number which is returned by the CompletableFuture
io.reactivex.internal.operators.flowable.FlowableFromFuture@62ce978e
I also tried to use the non blocking converter to single from net.javacrumbs.future-converter:future-converter-rxjava-java8:1.2.0
:
repository.find()
.map(s -> {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
return toSingle(completableFuture);
}).subscribe(System.out::println)
but, this leads to pretty much the same output: net.javacrumbs.futureconverter.rxjavacommon.RxJavaFutureUtils$ValueSourceBackedSingle@3f1eebb8
What am I missing?
some fiddling later and this helper method:
public static <T> Single<T> toSingle(CompletableFuture<T> future) {
return Single.create(subscriber ->
future.whenComplete((result, error) -> {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onSuccess(result);
}
}));
}
seems to do the trick:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return toSingle(completableFuture);
}).subscribe(System.out::println)
and as @akarnokd pointed out in the comment, there is this lib: https://github.com/akarnokd/RxJavaJdk8Interop#completionstage-to-rxjava which works pretty much the same way:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return SingleInterop.fromFuture(completableFuture);
}).subscribe(System.out::println)