Search code examples
javahashmapthread-safety

Threadsafty with HashMap and ScheduledExecutorService


I've build this "throttled" task runner, that collects some data in a HashMap and at the same time (every minute) sends that data "away" and clears the HashMap. In my tests I've noticed that the executor part can stop the scheduleAtFixedRate and never clear the HashMap again. I'm assuming this is because HashMap modification I'm doing are not threadsafe and it is crashing inside run() without a recovery. I'm modifying the HashMap in both threads. Can somebody point me into the right direction on how to optimize my modifications of the HashMap.

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class StatisticEventsDispatcher {
  public HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();

  final Duration timeout = Duration.ofMinutes(1);

  final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

  public StatisticEventsDispatcher(EventBus eventBus) {
    executor.scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            mappedCachedEvents.values().forEach(eventBus::post);
            mappedCachedEvents.clear();
            // i think this is not thread safe
          }
        },
        timeout.toMillis(),
        timeout.toMillis(),
        TimeUnit.MILLISECONDS);
  }

  public void applyChanges(String type, Map<String, Long> changes) {
    AbstractStatisticsEvent event;
    if (mappedCachedEvents.containsKey(type)) {
      event = mappedCachedEvents.get(type);
    } else {
      event = new AbstractStatisticsEvent(type);
      mappedCachedEvents.put(type, event);
      // i think this not thread safe
    }
    event.apply(changes);
  }
}

Solution

  • Yes, if two threads are modifying one HashMap without any concurrency control, it will not behave as you intend.

    While there are ConcurrentMap implementations, it appears that you want to "post" a snapshot of the collected events atomically. That is, it appears that you don't want new events to be added or events to be updated while you are iterating over the collection, even if it can be done in a thread-safe manner.

    It would be relatively simple to synchronize access to the shared map during iteration, but if iteration takes a while, updates from the main thread would be blocked during that time, which seems undesirable.

    Alternatively, the scheduled task could lock briefly to exchange the map with a new one:

    private final Object lock = new Object();
    private HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();
    
    ...
    
        executor.scheduleAtFixedRate(() -> 
            {
                Map<String, AbstractStatisticsEvent> toPost;
                synchronized(lock) {
                  toPost = mappedCachedEvents;
                  mappedCachedEvents = new HashMap<>();
                }
                toPost.values().forEach(eventBus::post);
            },
            timeout.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
    
    ...
    
    public void applyChanges(String type, Map<String, Long> changes) {
        synchronized(lock) {
            AbstractStatisticsEvent event = 
              mappedCachedEvents.computeIfAbsent(type, AbstractStatisticsEvent::new);
            event.apply(changes);        
        } 
      }
    }