Search code examples
javamultithreadingduplicatesspring-batchconcurrenthashmap

Keeping a track of duplicate inserts in a Map (Multithreaded environment)


I am looking for a way to keep a track of the number of times the same key insert is attempted into a Map in a multithreaded environemnt such that the Map can be read and updated by multiple threads at the same time. If keeping a track of duplicate key insert attempts is not achievable easily, an alternate solution would be to kill the application at the first sign of a duplicate key insert attempt.

The following user defined singleton Spring bean shows a global cache used by my application which is loaded using multiple partitioned spring batch jobs (one job for each DataType to be loaded). The addResultForDataType method can be called by multiple threads at the same time.

public class JobResults {

    private Map<DataType, Map<String, Object>> results;

    public JobResults() {
        results = new ConcurrentHashMap<DataType, Map<String, Object>>();
    }

    public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
        Map<String, Object> dataTypeMap = results.get(dataType);
        if (dataTypeMap == null) {
            synchronized (dataType) {
                dataTypeMap = results.get(dataType);
                if (dataTypeMap == null) {
                    dataTypeMap = new ConcurrentHashMap<String, Object>();
                    results.put(dataType, dataTypeMap);
                }
            }
        }
        dataTypeMap.put(uniqueId, result);
    }

    public Map<String, Object> getResultForDataType(DataType dataType) {
        return results.get(dataType);
    }

}

Here :

  • DataType can be thought of as the table name or file name from where the data is loaded. Each DataType indicates one table or file.
  • uniqueId represents the primary key for each record in the table or file.
  • result is the object representing the entire row.
  • The above method is called once per record. At any given time, multiple threads can be inserting a record for the same DataType or a different DataType.

I thought of creating another map to keep a track of the duplicate inserts :

public class JobResults {

    private Map<DataType, Map<String, Object>> results;
    private Map<DataType, ConcurrentHashMap<String, Integer>> duplicates;

    public JobResults() {
        results = new ConcurrentHashMap<DataType, Map<String, Object>>();
        duplicates = new ConcurrentHashMap<DataType, ConcurrentHashMap<String, Integer>>();
    }

    public void addResultForDataType(DataType dataType, String uniqueId, Object result) {
        Map<String, Object> dataTypeMap = results.get(dataType);
        ConcurrentHashMap<String,Integer> duplicateCount = duplicates.get(dataType);
        if (dataTypeMap == null) {
            synchronized (dataType) {
                dataTypeMap = results.get(dataType);
                if (dataTypeMap == null) {
                    dataTypeMap = new ConcurrentHashMap<String, Object>();
                    duplicateCount = new ConcurrentHashMap<String, Integer>();
                    results.put(dataType, dataTypeMap);
                    duplicates.put(dataType, duplicateCount);
                }
            }
        }
        duplicateCount.putIfAbsent(uniqueId, 0);
        duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1);//keep track of duplicate rows
        dataTypeMap.put(uniqueId, result);
    }

    public Map<String, Object> getResultForDataType(DataType dataType) {
        return results.get(dataType);
    }

}

I realize that the statemet duplicateCount.put(uniqueId, duplicateCount.get(uniqueId)+1); is not implicitly thread safe. To make it thread-safe, I will need to use synchronization which will slow down my inserts. How can I keep a track of the duplicate inserts without impacting the performance of my application. If keeping a track of duplicate inserts is not easy, I would be fine with just throwing an exception at the first sign of an attempt to overwrite an existing entry in the map.

Note I am aware that a Map does not allow duplicate keys. What I want is a way to keep a track of any such attempts and halt the application rather than overwrite entries in the Map.


Solution

  • Try something like this:

        ConcurrentHashMap<String, AtomicInteger> duplicateCount = new ConcurrentHashMap<String, AtomicInteger>();
    

    Then when you're ready to increment a count, do this:

    final AtomicInteger oldCount = duplicateCount.putIfAbsent(uniqueId, new AtomicInteger(1));
    if (oldCount != null) {
        oldCount.incrementAndGet();
    }
    

    So, if you do not have a count in the map yet, you will put 1, if you have, you will get the current value and atomically increment it. This should be thread safe.