My API makes about 100 downstream calls, in pairs, to two separate services. All responses need to be aggregated, before I can return my response to the client. I use hystrix-feign to make the HTTP calls.
I came up with what I believed was an elegant solution until on the rxJava docs I've found the following
BlockingObservable is a variety of Observable that provides blocking operators. It can be useful for testing and demo purposes, but is generally inappropriate for production applications (if you think you need to use a BlockingObservable this is usually a sign that you should rethink your design).
My code looks roughly as follows
List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
Observable<C> zipped = Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b));
observables.add(zipped);
}
Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();
Observable
.merge(observables)
.toBlocking()
.forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));
return apiResponse;
Few questions based on this setup:
A better option is to return the Observable
to be consumed by other operators but you may get away with blocking code (It should, however, run on a background thread.)
public Observable<D> getAll(Iterable<RequestPair> requests) {
return Observable.from(requests)
.flatMap(request ->
Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b)
)
, 8) // maximum concurrent HTTP requests
.map(both -> doSomeWork(both));
}
// for legacy users of the API
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
return getAll(requests)
.toList()
.toBlocking()
.first();
}
Am I correct in understanding that the actual HTTP calls do not get made until the main thread gets to the forEach()
Yes, the forEach
triggers the whole sequence of operations.
I've seen that the code in the forEach() block is executed by different threads, but I was not able to verify if there can be more than one thread in the forEach() block. Is the execution there concurrent?
Only one thread at a time is allowed to execute the lambda in forEach
but you may indeed see different threads entering there.