Search code examples
javaasynchronousexecutorserviceretrypolicycompletion-service

Retry policy in CompletionService


I need to configure retry policy for calling an API via ExecutorCompletionService.

Sample Code:

public void func() throws Exception{
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    List<Future<String>> list = new ArrayList<Future<String>>();
    for(int i=0; i<10; i++) {
        AsyncTest asyncTest = new AsyncTest();
        Future<String> futureString = completionService.submit(asyncTest);
        list.add(futureString);
    }
    while (list.size() > 0) {
        Future<String> futureResponse = completionService.take();
        System.out.println(futureResponse.get());
        list.remove(futureResponse);
        }
    executorService.shutdown();
}
public class AsyncTest implements Callable<String> {
       public String call() throws Exception {
              //returns a response from api call
              //this is a network call and throws TimeoutException
       }
}

What's the best way to implement retry policy for TimeoutException thrown while calling the API?


Solution

  • I have enhanced your class AsyncTest:

    public class RetryableAsyncTest implements Callable<RetryableAsyncTest> {
    
       private final String  _name;
       private /* */ String  _value;
       private /* */ boolean _timeouted;
       private /* */ int     _retryCount;
    
       public RetryableAsyncTest( String name ) {
          _name = name;
       }
    
       @Override
       public RetryableAsyncTest call() throws Exception {
          try {
             ++_retryCount;
             _timeouted = false;
             //-------- Begin of functionnal code
             if( Math.random() > 0.5 ) {      // Simulation of
                throw new TimeoutException(); // timeout condition
             }
             _value = "computation result";
             //-------- End of functionnal code
          }
          catch( final TimeoutException x ) {
             _timeouted = true;
          }
          return this;
       }
    
       public String getName() {
          return _name;
       }
    
       public String getValue() {
          return _value;
       }
    
       public boolean isTimeouted() {
          return _timeouted;
       }
    
       public int getRetryCount() {
          return _retryCount;
       }
    }
    

    RetryableAsyncExecutor class:

    public class RetryableAsyncExecutor {
    
       private final ExecutorService                       _exec;
       private final CompletionService<RetryableAsyncTest> _comp;
    
       public RetryableAsyncExecutor( int nThreads ) {
          _exec = Executors.newFixedThreadPool( nThreads );
          _comp = new ExecutorCompletionService<>( _exec );
       }
    
       public void submit( RetryableAsyncTest task ) {
          _comp.submit( task );
       }
    
       public RetryableAsyncTest get() throws Exception {
          final Future<RetryableAsyncTest> f = _comp.take();
          final RetryableAsyncTest task = f.get();
          if( task.isTimeouted()) {
             _comp.submit( task );
          }
          return task;
       }
    
       public void shutdown() {
          _exec.shutdown();
       }
    }
    

    Test case:

    public class Main {
    
       public static void main( String[] args ) {
          final int COUNT = 8;
          final RetryableAsyncExecutor re = new RetryableAsyncExecutor( 5 );
          try {
             for( int i = 0; i < COUNT; ++i ) {
                re.submit( new RetryableAsyncTest("Async#"+(i+1)));
             }
             int count = 0;
             while( count < COUNT ) {
                final RetryableAsyncTest task = re.get();
                if( task.isTimeouted()) {
                   System.err.printf( "%s: retrying (%d)\n",
                      task.getName(), task.getRetryCount());
                }
                else {
                   System.err.printf( "%s: done with '%s'.\n",
                      task.getName(), task.getValue());
                   ++count;
                }
             }
          }
          catch( final Throwable t ) {
             t.printStackTrace();
          }
          re.shutdown();
          System.exit( 0 );
       }
    }
    

    Execution log:

    Async#4: done with 'computation result'.
    Async#1: done with 'computation result'.
    Async#6: retrying (1)
    Async#3: done with 'computation result'.
    Async#8: done with 'computation result'.
    Async#7: retrying (1)
    Async#2: done with 'computation result'.
    Async#5: retrying (1)
    Async#6: done with 'computation result'.
    Async#7: done with 'computation result'.
    Async#5: retrying (2)
    Async#5: done with 'computation result'.
    

    If you want to ceil the number of retry, this logic takes place into RetryableAsyncExecutor.get() method, as an if-then-else condition around _comp.submit( task );