Search code examples
javarx-javahystrixfeign

Am I misusing rxJava by converting an observable into a blocking observable?


My API makes about 100 downstream calls, in pairs, to two separate services. All responses need to be aggregated, before I can return my response to the client. I use hystrix-feign to make the HTTP calls.

I came up with what I believed was an elegant solution until on the rxJava docs I've found the following

BlockingObservable is a variety of Observable that provides blocking operators. It can be useful for testing and demo purposes, but is generally inappropriate for production applications (if you think you need to use a BlockingObservable this is usually a sign that you should rethink your design).

My code looks roughly as follows

List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
    Observable<C> zipped = Observable.zip(
         feignClientA.sendRequest(request.A()),
         feignClientB.sendRequest(request.B()),
         (a, b) -> new C(a,b));
    observables.add(zipped);
}

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();

Observable
    .merge(observables)
    .toBlocking()
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));

return apiResponse;

Few questions based on this setup:

  1. Is toBlocking() justified given my use case
  2. Am I correct in understanding that the actual HTTP calls do not get made until the main thread gets to the forEach()
  3. I've seen that the code in the forEach() block is executed by different threads, but I was not able to verify if there can be more than one thread in the forEach() block. Is the execution there concurrent?

Solution

  • A better option is to return the Observable to be consumed by other operators but you may get away with blocking code (It should, however, run on a background thread.)

    public Observable<D> getAll(Iterable<RequestPair> requests) {
        return Observable.from(requests)
        .flatMap(request ->
            Observable.zip(
                feignClientA.sendRequest(request.A()),
                feignClientB.sendRequest(request.B()),
                (a, b) -> new C(a,b)
            )
        , 8)  // maximum concurrent HTTP requests
        .map(both -> doSomeWork(both));
    }
    
    // for legacy users of the API
    public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
        return getAll(requests)
            .toList()
            .toBlocking()
            .first();
    }
    

    Am I correct in understanding that the actual HTTP calls do not get made until the main thread gets to the forEach()

    Yes, the forEach triggers the whole sequence of operations.

    I've seen that the code in the forEach() block is executed by different threads, but I was not able to verify if there can be more than one thread in the forEach() block. Is the execution there concurrent?

    Only one thread at a time is allowed to execute the lambda in forEach but you may indeed see different threads entering there.