Search code examples
javaconcurrencyjava.util.concurrent

In Java, how to define a Future<V> that returns the best of several answers?


I need a value of type V that will have to be calculated asynchronously. Unfortunately, the best answer might take too long to calculate, so I have a couple of other values that I'll accept in a pinch. What I'd like to do is define a Future<V> that I can call with a timeout and have it return the best answer available in the time it had. Something like:

Future<V> theValue = // something involving executor.submit()
    // inside here, asynchronous calls to find
    V default = // something pretty quick to return
    V good = // something that will take longer and might not return in time
    V better = // something that will take longest but is the best answer

V v = theValue.get(5, TimeUnit.SECONDS); // now v should contain one of default,
    // good, or better, preferring them in the expected order

I'm sure this is a fairly common pattern, but I haven't been able to find a good example. Any help?


Solution

  • In the proposed scenario, there are three different versions of a computation (I will name them good, better, and best). Each subsequent version produces a result that is preferred over the version before it, but due to increased complexity may take respectively longer amounts of time before completing. There is some arbitrary amount of time that the caller is willing to wait for any result, and in this answer I will use the value of five(5) seconds for that cutoff time.

    It is possible to maintain this strict preference order while avoiding loops and queues by using a pair of latches for each batch of related operations.


    Primary logic

    ExecutorService execService = // ... e.g. new ThreadPoolExecutor(...)
    
    <T> T compute(
        Callable<? extends T> good,
        Callable<? extends T> better,
        Callable<? extends T> best) {
    
        RelatedCallables calls = new RelatedCallables(); // new for each batch
        Future<T> bestResult = execService.submit(calls.wrap(best)); // first wrapped is primary
        Future<T> betterResult = execService.submit(calls.wrap(better));
        Future<T> goodResult = execService.submit(calls.wrap(good));
    
        try {
            if (!calls.awaitPrimary(5, TimeUnit.SECONDS)) {
                calls.awaitAny(); // waits indefinitely, unless THIS thread interrupted
            }
            // reaching here means at least one of them has a result
            if (bestResult.isDone()) return bestResult.get();
            if (betterResult.isDone()) return betterResult.get();
            return goodResult.get();
        }
        catch (ExecutionException failedExecution) {
            // TODO: handling this is left as an exercise for the reader
            return null;
        }
        catch (InterruptedException interrupted) {
            // TODO: handling this is left as an exercise for the reader
            return null;
        }
        finally {
            boolean sendInterrupt = true; // or false, depending on your needs
            goodResult.cancel(sendInterrupt);
            betterResult.cancel(sendInterrupt);
            bestResult.cancel(sendInterrupt);
        }
    }
    

    This solution uses a helper class, RelatedCallables (detailed later below), to set up the relationship between the three computations submitted as Callable instances. The helper class will wrap each instance, and the wrapper is submitted to the ExecutorService for parallel execution. It is important in this implementation that the best Callable be wrapped first. The order of wrapping and submitting the other instances is not important.

    The combination of the awaitPrimary and awaitAny methods on the RelatedCallables helper, in conjunction with the if conditional, sets up our wait and timeout policy. If the best (primary) result is available within the specified timeout, it skips the contents of the if block and proceed directly to returning results so the caller does not have to wait the full five seconds.

    If awaitPrimary times out before the best computation completes, then it enters the true branch of the conditional and waits indefinitely for any computation to complete. It is generally expected (but I do not assume) that at least one of the other computations will have completed during the primary wait time; if so, awaitAny will return immediately. Instead of waiting indefinitely for one of the three computations, it is also possible to return null, a predetermined value, or throw an exception with some relatively minor tweaks.

    Once the program flow passes the conditional block, it checks each Future in preference order and returns the value from the first one that indicates it has finished. Also note that the finally block attempts to cancel any pending computations.


    Helper (inner) class

    static class RelatedCallables {
        private final CountDownLatch primaryLatch = new CountDownLatch(1);
        private final CountDownLatch anyLatch = new CountDownLatch(1);
        private boolean hasPrimary;
    
        void awaitAny() throws InterruptedException {
            anyLatch.await();
        }
    
        boolean awaitPrimary(long timeout, TimeUnit unit) throws InterruptedException {
            return primaryLatch.await(timeout, unit);
        }
    
        <T> Callable<T> wrap(final Callable<? extends T> original) {
            final boolean isPrimary = !hasPrimary;
            hasPrimary = true;
    
            return new Callable<T>() {
                @Override
                public T call() throws Exception {
                    try {
                        return original.call();
                    }
                    finally {
                        anyLatch.countDown();
                        if (isPrimary) primaryLatch.countDown();
                    }
                }
            };
        }
    }
    

    This is a relatively simple Callable wrapper that chains to the wrapped instance. Its main purpose is to decrement the anyLatch latch count after the wrapped instance completes. That is how awaitAny will know if any of the callables wrapped by this helper have completed.

    There is a second latch, used only with the first Callable submitted to wrap, that logically distinguishes the primary (or best) instance. The wrapper for that instance decrements this separate latch so that awaitPrimary can time out quickly in the event that the best computation completes before the cutoff time expires.

    Because CountDownLatch is not reusable, each distinct batch of Callable operations needs a new pair of latches. In this implementation, that is accomplished by creating a new instance of RelatedCallables for every batch.