Search code examples
javaconcurrencypromisevolatilecompletable-future

Sequential call CompletableFutures


I have infinite queue of promises(completablefuture) as an input. The goal is to run promises one by one till condition fulfilled on the result and stop processing and return result from current promise.

My iterative solution looks like that:

volatile boolean shouldKeepReading = true;
....
CompletableFuture<Integer> result = promisesQueue.poll().get();

while (shouldKeepReading) {
      result = result.thenCompose(res -> {
        if (conditionPass(res)) {
          shouldKeepReading = false;
          return CompletableFuture.completedFuture(0));
        } else {
          if (shouldKeepReading) {
            return promisesQueue.poll().get();
          } else {
            return CompletableFuture.completedFuture(0));
          }
        }
      });
  1. I used infinite loop with volatile flag to control processing. Volatile guarantee memory visibility to all readers. Once condition met control flag will be set to false in order to stop processing.
  2. I used double check before read next item.
if (shouldKeepReading) {
            return promisesQueue.poll().get();

The code seems works correct but noticed that volatile keyword is not needed here, it doesn't change the processing. Why ? Have I miss something ? Do you see any problems with that code ?


Solution

  • The HotSpot JVM is rather conservative. It’s too easy to reproducibly see writes made by other threads as side effect of other, unrelated, reads and writes with stronger memory guarantees.

    For example, in your case thenCompose checks the completion status of the future whereas the implementation specific caller of the function will change the completion status. This may appear to have the desired effect even when the status is “not completed” in which case there’s no formal happens-before relationship or when actually calling thenApply on the next chained future which also doesn’t establish a happens-before relationship as it’s a different variable.

    In other words, it may appear to work with this JVM implementation without volatile but is not guaranteed, so you should never rely on such behavior.

    Even worse, your code is not guaranteed to work even with volatile.
    The basic shape of your code is

    CompletableFuture<Integer> result = …
    
    while (shouldKeepReading) {
      result = result.thenCompose(…);
    }
    

    which implies that as long as the initial future is not already completed, this loop may chain an arbitrary number of dependent actions until the completion of the dependency chain manages to catch up. The system load caused by this loop may even prevent the chain from catching up, until encountering an OutOfMemoryError.

    As long as the completion chain manages to catch up, you don’t notice a difference, as all chained actions evaluate to the same result, zero, as soon as shouldKeepReading became false.

    Since the original future originates from promisesQueue.poll().get() outside the scope, we may simulate a higher workload by inserting a small delay. Then, add a counter to see what the end result doesn’t tell, e.g.

    AtomicInteger chainedOps = new AtomicInteger();
    
    CompletableFuture<Integer> result = promisesQueue.poll().get();
    result = result.whenCompleteAsync(
        (x,y) -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)));
    
    while(shouldKeepReading) {
        result = result.thenCompose(res -> {
            chainedOps.incrementAndGet();
            if(conditionPass(res)) {
                shouldKeepReading = false;
                return CompletableFuture.completedFuture(0);
            } else {
                if (shouldKeepReading) {
                    return promisesQueue.poll().get();
                } else {
                    return CompletableFuture.completedFuture(0);
                }
            }
        });
    }
    result.join();
    System.out.println(chainedOps.get() + " chained ops");
    

    On my machine, the loop easily chains more than five million actions, even when conditionPass returns true in the first.

    The solution is quite simple. Use neither a flag variable nor a loop

    result = result.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
        @Override
        public CompletionStage<Integer> apply(Integer res) {
            // for testing, do chainedOps.incrementAndGet();
            return conditionPass(res)? CompletableFuture.completedFuture(0):
                promisesQueue.poll().get().thenCompose(this);
          }
    });
    

    This calls thenCompose only when the condition is not fulfilled, hence, never chains more actions than necessary. Since it requires the function itself to be chained via thenCompose(this), the lambda has to be replaced by an anonymous inner class. If you don’t like this, you may resort to a recursive solution

    CompletableFuture<Integer> retryPoll() {
        CompletableFuture<Integer> result = promisesQueue.poll().get();
        return result.thenComposeAsync(res ->
            conditionPass(res)? CompletableFuture.completedFuture(0): retryPoll());
    }
    

    It’s remarkably simple here, as the retry doesn’t depend on the result of the previous evaluation (you’d need to introduce parameters otherwise), but on the changes promisesQueue.poll().get() makes to the program’s state.

    This method uses thenComposeAsync to avoid deep recursions if there is a large number of already completed futures whose result is rejected by conditionPass. If you know for sure that conditionPass will succeed after a rather small amount of retries, you can change thenComposeAsync to thenCompose.