Search code examples
javareactive-programmingrx-javavert.xvertx3

Vertx Future does not wait


Since I´m using Vertx 3.1 in my stack, I was thinking to use the Future feature that the tools brings, but after read the API seems pretty limited to me. I cannot even find the way to make the the future wait for an Observable. Here my code

          public Observable<CommitToOrderCommand> validateProductRestrictions(CommitToOrderCommand cmd) {
    Future<Observable<CommitToOrderCommand>> future = Future.future();
    orderRepository.getOrder(cmd, cmd.orderId)
                   .flatMap(order -> validateOrderProducts(cmd, order))
                   .subscribe(map -> checkMapValues(map, future, cmd));
     Observable<CommitToOrderCommand> result = future.result();
    if(errorFound){
        throw MAX_QUANTITY_PRODUCT_EXCEED.create("Fail"/*restrictions.getBulkBuyLimit().getDescription())*/);
    }
    return result;
}

private void checkMapValues(Multimap<String, BigDecimal> totalUnitByRestrictions, Future<Observable<CommitToOrderCommand>> future,
                            CommitToOrderCommand cmd) {
    for (String restrictionName : totalUnitByRestrictions.keySet()) {
        Restrictions restrictions = Restrictions.valueOf(restrictionName);
        if (totalUnitByRestrictions.get(restrictionName)
                                   .stream()
                                   .reduce(BigDecimal.ZERO, BigDecimal::add)
                                   .compareTo(restrictions.getBulkBuyLimit()
                                                          .getMaxQuantity()) == 1) {
            errorFound = true;
        }
    }
    future.complete(Observable.just(cmd));
}

In the onComplete of my first Observable I´m checking the results, and after finish is when I finish the future to unblock the operation. But I´m looking that future.result is not block until future.complete is invoke as I was expecting. Instead is just returning null.

Any idea what´s wrong here?

Regards.


Solution

  • The vertx future doesn't block but rather work with a handler that is invoked when a result has been injected (see setHandler and isComplete).

    If the outer layer of code requires an Observable, you don't need to wrap it in a Future, just return Observable<T>. Future<Observable<T>> doesn't make much sense, you're mixing two ways of doing async results.

    Note that there are ways to collapse an Observable into a Future, but the difficulty is that an Observable may emit several items whereas a Future can hold only a single item. You already took care of that by collecting your results into a single emission of map.

    Since this Observable only ever emits one item, if you want a Future out of it you should subscribe to it and call future.complete(yourMap) in the onNext method. Also define a onError handler that will call future.fail.