Search code examples
javarx-javaguavafuturerx-java2

Convert a ListenableFuture chain to the equivalent RxJava structure


Using Guava Listenable Futures

Assume I have the following class:

public class FooService {
    ListenableFuture<Foo> getFoo() {
        //code to create callable, then
        return listeningExecutorService.submit(fooCallable);
    }
}

and the following class:

public class BarService {
    ListenableFuture<Bar> getBar(Foo foo) {
        //code to create callable, then
        return listeningExecutorService.submit(barCallable);
    }
}

Note that getBar requires a Foo in the parameters.

If I want to chain these two operations together I would write a transformer function like this:

AsyncFunction<Foo, Bar> fooToBar = new AsyncFunction<Foo, Bar>() {

     @Override
     ListenableFuture<Bar> apply(Foo resultantFoo) {
         return barService.get(resultantFoo);
     }
};

and then apply the transformation like this:

public ListenableFuture<Bar> combinedFooToBar() {
     ListenableFuture<Foo> futureFoo = fooService.get();
     return Futures.transformAsync(futureFoo, fooToBar);
}

Question: what is the equivalent syntax for these classes and transformation function if we were to convert them into RxJava? Assume that we want to convert FooService and BarService into the appropriate RxJava structures. Assume we want to chain async tasks using the result of calling FooService as the parameter for BarService.

NB: I am just starting to learn about RxJava syntax. When I have finished studying the syntax I will attempt answer the question myself. However, in the meantime if anyone wants to answer they are welcome.


Solution

  • The Guava code translates into RxJava2 code as follows:

    FooService.java

    public class FooService {
    
        Observable<Foo> getFoo() {
            return Observable.fromCallable(new Callable<Foo>() {
                @Override
                public Foo call() throws Exception {
                    return new Foo();
                }
            });
        }
    }
    

    BarService.java

    public class BarService {
    
        Observable<Bar> getBar(final Foo foo) {
            return Observable.fromCallable(new Callable<Bar>() {
                @Override
                public Bar call() throws Exception {
                    return new Bar(foo);
                }
            });
        }
    }
    

    FooBarService.java

    public class FooBarService {
    
        private final FooService fooService;
        private final BarService barService;
    
        public FooBarService(FooService fooService, BarService barService) {
            this.fooService = fooService;
            this.barService = barService;
        }
    
        Observable<Bar> getFooBar() {
            return fooService.getFoo()
                    .concatMap(new Function<Foo, ObservableSource<? extends Bar>>() {
                        @Override
                        public ObservableSource<? extends Bar> apply(@NonNull Foo foo) throws Exception {
                            return barService.getBar(foo);
                        }
                    });
        }
    }
    

    Hence, concatMap and flatMap are similar to Futures.transformAsync and map is similar to Futures.transform (non-async).

    Note also this Github project called Future Converter for conversion between ListenableFuture and Observable.