Search code examples
javaspringjava-8guavafuture

How can I release the semaphore when two futures are completed?


I am having a Springboot java application that talks to cassandra database and also using google guava libaries.

currently i am facing a an issue. I have a semaphore object in the code.

in my my method i have to perform two write queries simulatenously using two objects( mapper and parameterisedListMsisdnMapper).

Firing each queries using the mappers returns ListenableFuture future & ListenableFuture future1 objects . How can I rewrited the below code, so that i will release the semaphore upon completion of both future and future1 object.

public class ParameterisedListItemRepository {
        
        public ParameterisedListItemRepository() {
         this.executor = MoreExecutors.directExecutor();
         this.semaphore = new Semaphore(getNumberOfRequests(session));
        }
       
       public void saveAsync(ParameterisedListItem parameterisedListItem) {
           try {
             semaphore.acquire();
             ListenableFuture<Void> future = mapper.saveAsync(parameterisedListItem);
             ListenableFuture<Void> future1 = parameterisedListMsisdnMapper.saveAsync( mapParameterisedList(parameterisedListItem));
             future.addListener(() -> semaphore.release(), executor);
            } catch (InterruptedException e) {
                        throw new RuntimeException("Semaphore was interrupted.");
            }
       }
    
    }

appreciate any help


Solution

  • I have used Futures.whenAllSucceed and it worked

      public void saveAsync(ParameterisedListItem parameterisedListItem) {
            if (parameterisedListItem.getId() == null) {
                parameterisedListItem.setId(UUID.randomUUID());
            }
            Set<ConstraintViolation<ParameterisedListItem>> violations = validator.validate(parameterisedListItem);
            if (violations != null && !violations.isEmpty()) {
                throw new ConstraintViolationException(violations);
            }
    
            Callable releasePermit = () -> { semaphore.release();
                return null;
            };
            
            try {
                semaphore.acquire();
                ListenableFuture<Void> future1 = mapper.saveAsync(parameterisedListItem);
                ListenableFuture<Void> future2 = parameterisedListMsisdnMapper.saveAsync( mapParameterisedList(parameterisedListItem));
                Futures.whenAllSucceed(future1, future2).call(releasePermit, executor);
    
            } catch (InterruptedException e) {
                //FIXME handle exception in better way
                throw new RuntimeException("Semaphore was interrupted.");
            }
        }