Search code examples
javamultithreadingperformancethreadpoolcallable

How can I manage multiple callable threads result without a specific order?


Basically something like this:

ExecutorService service = Executors.newFixedThreadPool(2);

Future<Boolean> futureFoo = service.submit(myFooTask);
Future<Boolean> futureBar = service.submit(myBarTask);

int resultFoo;
boolean resultBar;
resultFoo = futureFoo.get();
resultBar = futureBar.get();

I want to do an event to manage independently the first result I get, without waiting for futureFoo to finish first.


Solution

  • If you want to return different types, you can use a base class and do downcasting. for example like this:

    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class CompletionServiceExample {
    
    
        private static Logger LOGGER = Logger.getLogger("CompletionServiceExample");
    
        public static void main(String[] args) throws InterruptedException {
    
            CompletionServiceExample completionServiceExample = new CompletionServiceExample();
            completionServiceExample.doTheWork();
        }
    
        private void doTheWork() throws InterruptedException {
    
            final ExecutorService executorService = Executors.newFixedThreadPool(2);
            final CompletionService<Base> completionService = new ExecutorCompletionService<>(executorService);
    
    
            completionService.submit(new FooBase());
            completionService.submit(new BarBase());
    
            int total_tasks = 2;
    
            for (int i = 0; i < total_tasks; ++i) {
    
                try {
                    final Future<Base> value = completionService.take();
                    Base base = value.get();
                    if (base instanceof FooBase) {
                        int myInteger = ((FooBase) base).getValue();
                        System.out.println("received value: " + myInteger);
                    }
                    if (base instanceof BarBase) {
                        boolean myBoolean = ((BarBase) base).isValue();
                        System.out.println("received value: " + myBoolean);
                    }
    
                } catch (ExecutionException e) {
                    LOGGER.log(Level.WARNING, "Error while processing task. ", e);
                } catch (InterruptedException e) {
                    LOGGER.log(Level.WARNING, "interrupted while waiting for result", e);
                }
            }
    
    
            executorService.shutdown();
            executorService.awaitTermination(5, TimeUnit.SECONDS);
    
        }
    }
    
    class Base {
    }
    
    class FooBase extends Base implements Callable<Base> {
    
        private int value;
    
        @Override
        public Base call() throws Exception {
            Thread.sleep(5000);
            value = 10;
            return this;
        }
    
        public int getValue() {
            return value;
        }
    }
    
    class BarBase extends Base implements Callable<Base> {
    
        private boolean value;
    
        @Override
        public Base call() throws Exception {
            Thread.sleep(1000);
            value = false;
            return this;
        }
    
        public boolean isValue() {
            return value;
        }
    }