Search code examples
javahystrix

How to execute Hystrix commands in parallel?


One method of Spring service in my application makes requests to two other microservices. I would like to make these requests using Hystrix to make it fault tolerant and I would like to run them in parallel.

So far, I implemented HystrixObservableCommand for each call and I use CountDownLatch to wait until both commands are finished (or failed).

Current solution looks very verbose. Is it possible to execute Hystrix commands in parallel using Observable features?

Desired solution would look like, in pseudo code:

new LoadCustomerObservableCommand(customerClient, customerId).toObservable()
    .doOnError(throwable -> log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable))
    .doOnNext(customer -> myResponse.setCustomer(customer));

new GetTicketsObservableCommand(ticketsClient, reservationId).toObservable()
    .doOnError(throwable -> log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable))
    .doOnNext(tickets -> myResponse.setTickets(tickets));

final AtomicBoolean subRequestsFailed = new AtomicBoolean(false);

Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
    .doOnError(throwable -> subRequestsFailed.set(true))
    .toBlocking()
    .first();

if (subRequestsFailed.get()) {
     throw new HystrixBadRequestException("One or more requests to submodules have been failed");
}

return dto;

Unfortunately, this desired solution is not working, because Hystrix commands are never executed.

My current solution is:

    // execute requests to sub modules in parallel
    final CountDownLatch cdl = new CountDownLatch(2);
    final List<Throwable> failures = new ArrayList<>();

    // load customer information
    final Observable<CustomerDTO> customerObservable = customerRxClient.loadCustomer(customerId);

    customerObservable
            .doOnError(throwable -> {
                log.error("Failed to retrieve customer {} information for the reservation {}", customerId, reservationId, throwable);

                cdl.countDown();

                failures.add(throwable);
            })
            .doOnCompleted(cdl::countDown)
            .subscribe(customer -> {
                dto.getReservationOwner().setBirthday(customer.getBirthday());
                dto.getReservationOwner().setCustomerId(customer.getCustomerId());
                dto.getReservationOwner().setCitizenship(customer.getCitizenship());
                dto.getReservationOwner().setEmail(customer.getEmail());
                dto.getReservationOwner().setFirstName(customer.getFirstName());
                dto.getReservationOwner().setGender(customer.getGender());
                dto.getReservationOwner().setLastName(customer.getLastName());
                dto.getReservationOwner().setPhone(ofNullable(customer.getPhone()).map(v -> mappingService.map(v, PhoneDTO.class)).orElse(null));
            });

    // load tickets
    final Observable<List<TicketDTO>> ticketsObservable = ticketsClient.getTickets(reservationId);

    ticketsObservable
            .doOnError(throwable -> {
                log.error("Failed to retrieve tickets for the reservation {}", reservationId, throwable);

                cdl.countDown();

                failures.add(throwable);
            })
            .doOnCompleted(cdl::countDown)
            .subscribe(tickets -> dto.setTickets(tickets.stream()
                    .map(ticket -> ReservationDTO.TicketDTO.builder()
                            .guestSeqN(ticket.getGuestSeqN())
                            .qr(ticket.getQr())
                            .qrText(ticket.getQrText())
                            .usedAt(ticket.getUsedAt())
                            .build())
                    .collect(toList())));

    try {
        cdl.await();
    } catch (InterruptedException _ignore) {
        log.debug("Count down latch has been interrupted!", _ignore);
    }

    if (!failures.isEmpty()) {
        throw new HystrixBadRequestException("Request to submodule has been failed");
    }

 return dto;

Solution

  • You have the right idea with your desired solution in that it uses the zip combinator. The reason the Hystrix commands are not executed in that solution is that the resulting Observable doesn't have a subscriber. From the documentation:

    toObservable() — returns a "cold" Observable that won’t subscribe to the underlying Observable until you subscribe to the resulting Observable

    Simply call the subscribe() method on the combined Observable:

    Observable.zip(customerObservable, ticketsObservable, (customer, tickets) -> null)
              .take(1)
              .doOnError(throwable -> subRequestsFailed.set(true))
              .subscribe();