Search code examples
javamultithreadingjava-8java.util.concurrent

ConcurrentSkipListMap how to make remove and add calls atomic


I have N threads that add values and one removing thread. I am thinking of the best way how to sync adding to existing list of values and removing of the list.

I guess following case is possible:

 thread 1 checked condition containsKey, and entered in else block
 thread 2 removed the value
 thread 1 try to add value to existing list, and get returns null

I think the only approach that I can use is syncing by map value, in our case is List when we adding and when we deleting

    private ConcurrentSkipListMap<LocalDateTime, List<Task>> tasks = new ConcurrentSkipListMap<>();

    //Thread1,3...N
    public void add(LocalDateTime time, Task task) {
        if (!tasks.containsKey(time)) {
            tasks.computeIfAbsent(time, k -> createValue(task));
        } else {
             //potentially should be synced
            tasks.get(time).add(task);
        }
    }
    private List<Task> createValue(Task val) {
        return new ArrayList<>(Arrays.asList(val));
    }

    //thread 2
   public void remove()
    while(true){
        Map.Entry<LocalDateTime, List<Task>> keyVal = tasks.firstEntry();
        if (isSomeCondition(keyVal)) {
            tasks.remove(keyVal.getKey());
            for (Task t : keyVal.getValue()) {
                //do task processing
            }
        }
    }
   }

Solution

  • It’s not entirely clear what your remove() method is supposed to do. In its current form, it’s an infinite loop, first, it will iterate over the head elements and remove them, until the condition is not met for the head element, then, it will repeatedly poll for that head element and re-evaluate the condition. Unless, it manages to remove all elements, in which case it will bail out with an exception.

    If you want to process all elements currently in the map, you may simply loop over it, the weakly consistent iterators allow you to proceed while modifying it; you may notice ongoing concurrent updates or not.

    If you want to process the matching head elements only, you have to insert a condition to either, return to the caller or put the thread into sleep (or better add a notification mechanism), to avoid burning the CPU with a repeated failing test (or even throw when the map is empty).

    Besides that, you can implement the operations using ConcurrentSkipListMap when you ensure that there is no interference between the functions. Assuming remove is supposed to process all current elements once, the implementation may look like

    public void add(LocalDateTime time, Task task) {
        tasks.merge(time, Collections.singletonList(task),
            (l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList()));
    }
    
    public void remove() {
        for(Map.Entry<LocalDateTime, List<Task>> keyVal : tasks.entrySet()) {
            final List<Task> values = keyVal.getValue();
            if(isSomeCondition(keyVal) && tasks.remove(keyVal.getKey(), values)) {
                for (Task t : values) {
                    //do task processing
                }
            }
        }
    }
    

    The key point is that the lists contained in the map are never modified. The merge(time, Collections.singletonList(task), … operation will even store an immutable list of a single task if there was no previous mapping. In case there are previous tasks, the merge function (l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList()) will create a new list rather than modifying the existing ones. This may have a performance impact when the lists become much larger, especially when the operation has to be repeated in the case of contention, but that’s the price for not needing lock nor additional synchronization.

    The remove operation uses the remove(key, value) method which only succeeds if the map’s value still matches the expected one. This relies on the fact that neither of our methods ever modifies the lists contained in the map, but replaces them with new list instances when merging. If remove(key, value) succeeds, the list can be processed; at this time, it is not contained in the map anymore. Note that during the evaluation of isSomeCondition(keyVal), the list is still contained in the map, therefore, isSomeCondition(keyVal) must not modify it, though, I assume that this should be the case for a testing method like isSomeCondition anyway. Of course, evaluating the list within isSomeCondition also relies on the other methods never modifying the list.