Search code examples
javamultithreadingconcurrencythread-safetyguava

How to add latencies in the ConcurrentLinkedQueue which is part of ConcurrentMap map in atomic way?


I am calculating latencies in my application (in millseconds) and I want insert those metric in a thread safe list structure. And then I will use that list to calculate average, median, 95th percentile later on. So I looked it up and I didn't see much option with list so I decided to use ConcurrentLinkedQueue to store the latencies which is thread safe. If there is any other better thread safe data structure that I should use for this, let me know.

public class LatencyMetricHolder {
  private final ConcurrentLinkedQueue<Long> latenciesHolder = new ConcurrentLinkedQueue<>();

  private static class Holder {
    private static final LatencyMetricHolder INSTANCE = new LatencyMetricHolder();
  }

  public static LatencyMetricHolder getInstance() {
    return Holder.INSTANCE;
  }

  private LatencyMetricHolder() {}    

  public void addLatencies(long latency) {
    latenciesHolder.add(latency);
  }

  public ConcurrentLinkedQueue<Long> getLatenciesHolder() {
    return latenciesHolder;
  }
}

I am calling addLatencies method to populate the latencies from multithreaded code.

Now I want to have latenciesHolder for each processId which is a String. That means we can also get same processId multiple times and sometimes it will be a new processId, so somehow then I need to extract the latenciesHolder queue for that processId and add latency on that particular queue in a thread safe way and atomic way.

So I decided to use concurrent map for this as shown below where key will be processId:

private final Map<String, ConcurrentLinkedQueue<Long>> latenciesHolderByProcessId = Maps.newConcurrentMap();

Since I am using a map, then I need to synchronize on the creation of new ConcurrentLinkedQueue instances which is kind of tricky in Java 7 I guess as I am on Java 7.

What is the right way to populate this map in a atomic way from multiple threads without much contention and I want to use concurrent collections instead of conventional locking if there is any way?

Update:

  public void add(String type, long latencyInMs) {
    ConcurrentLinkedQueue<Long> latencyHolder = latenciesHolderByProcessId.get(type);
    if (latencyHolder == null) {
      latencyHolder = Queues.newConcurrentLinkedQueue();
      ConcurrentLinkedQueue<Long> currentLatencyHolder =
          latenciesHolderByProcessId.putIfAbsent(type, latencyHolder);
      if (currentLatencyHolder != null) {
        latencyHolder = currentLatencyHolder;
      }
    }
    latencyHolder.add(latencyInMs);
  }

Solution

  • ok, I'm not sure I get all the specs right, but this is one way to do it:

    public class LatencyMetricHolder {
    
     HashMap<Integer, ConcurrentLinkedQueue<Long>> map = new HashMap<>();
    
     public void addLatency(int processID, long latency) {
        synchronized(map) {
          ConcurrentLinkedQueue<Long> q = map.get(processID);
          if (q == null) {
            q = new ConcurrentLinkedQueue<Long>()
            map.put(processID, q);
          }
        }
        q.add(latency)
      }
    
      public ConcurrentLinkedQueue<Long> getLatenciesHolder(int processID) {
        synchronized(map) {
          return map.get(processId);
        }
      }
    }