Search code examples
javamultithreadingconcurrencyfuturetask

use FutureTask for concurrency


I have a service like:

class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}

Now I want it faster and I found that some filters can start at the same time, while some filters must wait for others to finish. For example:

filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--

which means:

1.filter1 and filter2 can start at the same time, so do filter3 and filter4

2.filter3 and filter4 must wait for filter2 to finish

one more thing:

if filter2 returns true, then the 'process' method returns immediately and ignores the following filters.

now my solution is using FutureTask:

            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }

my CallableFilter:

public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}

I want to know:

1.is it a good idea to use FutureTask in the case? is there a better solution?

2.the overhead of thread context switch.

thanks!


Solution

  • In Java 8 you can use CompletableFuture to chain your filters after each other. Use the thenApply and thenCompose family of methods in order to add new asynchronous filters to the CompletableFuture - they will execute after the previous step is finished. thenCombine combines two independent CompletableFutures when both are finished. Use allOf to wait for the result of more than two CompletableFuture objects.

    If you can't use Java 8, then the Guava ListenableFuture can do the same, see Listenable Future Explained. With Guava you can wait for multiple independently running filters to finish with Futures.allAsList - this also returns a ListenableFuture.

    With both approaches the idea is that after you declare your future actions, their dependencies on each other, and their threads, you get back a single Future object, which encapsulates your end result.

    EDIT: The early return could be implemented by explicitly completing the CompletableFuture with the complete() method or using a Guava SettableFuture (which implements ListenableFuture)