Search code examples
collectionsjava.util.concurrentconcurrenthashmap

Processing the collection concurrently in java


I am using the following code fragment to process the java collection concurrently. Basically i am using the TaskExecutors to process the collection in multiple threads that check for the duplicate transaction in the collection based on the transaction id. There is no relationship between the transactions except the duplicate check.

I want to know the following code has any concurrent issue?

public class Txn {
    private long id;
    private String status;

    @Override
    public boolean equals(Object obj) {
        return this.getId() == ((Txn) obj).getId();
    }

}


public class Main {
    public static void main(String[] args) throws Exception {
        List<Txn> list = new ArrayList<Txn>();
        List<Txn> acceptedList = new ArrayList<Txn>();
        List<Txn> rejectedList = new ArrayList<Txn>();
        for (long i = 0; i < 10000l; i++) {
            Txn txn = new Txn();
            txn.setId(i % 1000);
            list.add(txn);
        }
        final ConcurrentHashMap<Long, Integer> map = new ConcurrentHashMap<>();
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < list.size(); i++) {
            final Txn txn = list.get(i);
            Callable<Void> callable = new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    if (map.putIfAbsent(txn.getId(), 1) != null) {
                        txn.setStatus("duplicate");
                    }
                    return null;
                }
            };
            executorService.submit(callable);
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

        for (Txn txn : list) {
            if (txn.getStatus() != null && txn.getStatus().equalsIgnoreCase("duplicate")) {
                rejectedList.add(txn);
            } else {
                acceptedList.add(txn);
            }
        }
        Set<Txn> set = new HashSet<>(acceptedList);
        if (set.size() != acceptedList.size()) {
            throw new Exception("11111111");
        }
        System.out.println(acceptedList.size());
        System.out.println(rejectedList.size());
    }
} 

Appreciate your comments. Thanks


Solution

  • The code below uses the divide and conquer method to split the list of transaction in to multiple small list and process each list in the separate thread and then merge the each partitioned list in to a single list. As suggested by sturcotte06, need to override the hashCode method to keep the duplicate transactions in the same list. The main advantage of this method is there is no race condition when using the HashMap.

    public class MainDivideAndConquer {
        public static void main(String[] args) throws Exception {
            List<Txn> list = new ArrayList<Txn>();
            List<Txn> acceptedList = new ArrayList<Txn>();
            List<Txn> rejectedList = new ArrayList<Txn>();
            for (long i = 0; i < 10000000l; i++) {
                Txn txn = new Txn();
                txn.setId(i % 1000);
                txn.setStatus("sadden");
                list.add(txn);
            }
            long t1 = System.nanoTime();
            int cpuCount = Runtime.getRuntime().availableProcessors();
            final List<Txn>[] splittedArray = split(list, cpuCount);
    
            ExecutorService executorService = Executors.newFixedThreadPool(cpuCount);
            List<Future<List<Txn>>> futures = new ArrayList<>();
            for (int i = 0; i < cpuCount; i++) {
                final List<Txn> splittedList = splittedArray[i];
                System.out.println("list size:" + splittedList.size());
                Callable<List<Txn>> callable = new Callable<List<Txn>>() {
                    Map<Long, Integer> map = new HashMap<Long, Integer>();
    
                    @Override
                    public List<Txn> call() throws Exception {
                        for (Txn txn : splittedList) {
                            if (map.containsKey(txn.getId())) {
                                txn.setStatus("duplicate");
                            } else {
                                map.put(txn.getId(), 1);
                            }
                        }
                        return splittedList;
                    }
                };
                futures.add(executorService.submit(callable));
            }
    
            for (int i = 0; i < futures.size(); i++) {
                Future<List<Txn>> future = futures.get(i);
                for (Txn txn : future.get()) {
                    if (txn.getStatus() != null && txn.getStatus().equalsIgnoreCase("duplicate")) {
                        rejectedList.add(txn);
                    } else {
                        acceptedList.add(txn);
                    }
                }
            }
            executorService.shutdown();
            long t2 = System.nanoTime();
            System.out.println("Time taken:" + (t2 - t1) / 1000000000);
            System.out.println(acceptedList.size());
            System.out.println(rejectedList.size());
        }
    
        public static List<Txn>[] split(List<Txn> transactions, int n) {
            List[] splitResult = new List[n];
            for (int i = 0; i < n; i++) {
                splitResult[i] = new ArrayList<>();
            }
    
            for (Txn txn : transactions) {
                splitResult[txn.hashCode() % n].add(txn);
            }
    
            return splitResult;
        }
    
    }