Search code examples
rx-java2

RxJava How to create a non blocking Single from CompletableFuture<Long>


given a repository class, which returns a single:

Single<SomeObject> find()

and a CompletableFuture which returns a Float:

CompletableFuture<Long> completableFuture

I would like to invoke the repository method first and based on the result, I need to invoke the completableFuture. This is the code I have:

repository.find()
          .flatMap(s -> {
              CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;

              return Single.fromFuture(completableFuture);

          }).subscribe(System.out::println)

Problem here is that Single.fromFuture will block and therefor can not be used.

In order to work around this I tried things like:

repository.find()
          .map(s -> {
              CompletableFuture<Long> completableFuture = new CompletableFuture<>();

              return Flowable.fromFuture(completableFuture);
          }).subscribe(System.out::println)

while this works fine without blocking, the subscribe function prints the following instead of the number which is returned by the CompletableFuture

io.reactivex.internal.operators.flowable.FlowableFromFuture@62ce978e

I also tried to use the non blocking converter to single from net.javacrumbs.future-converter:future-converter-rxjava-java8:1.2.0:

repository.find()
          .map(s -> {
              CompletableFuture<Long> completableFuture = new CompletableFuture<>();

              return toSingle(completableFuture);
          }).subscribe(System.out::println)

but, this leads to pretty much the same output: net.javacrumbs.futureconverter.rxjavacommon.RxJavaFutureUtils$ValueSourceBackedSingle@3f1eebb8

What am I missing?


Solution

  • some fiddling later and this helper method:

    public static <T> Single<T> toSingle(CompletableFuture<T> future) {
        return Single.create(subscriber ->
                future.whenComplete((result, error) -> {
                    if (error != null) {
                        subscriber.onError(error);
                    } else {
                        subscriber.onSuccess(result);
                    }
                }));
    }
    

    seems to do the trick:

    repository.find()
              .flatMap(s -> {
                  CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
    
                  return toSingle(completableFuture);
    
              }).subscribe(System.out::println)
    

    and as @akarnokd pointed out in the comment, there is this lib: https://github.com/akarnokd/RxJavaJdk8Interop#completionstage-to-rxjava which works pretty much the same way:

    repository.find()
              .flatMap(s -> {
                  CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
    
                  return SingleInterop.fromFuture(completableFuture);
    
              }).subscribe(System.out::println)