I am calculating latencies in my application (in millseconds) and I want insert those metric in a thread safe list structure. And then I will use that list to calculate average, median, 95th percentile later on. So I looked it up and I didn't see much option with list so I decided to use ConcurrentLinkedQueue
to store the latencies which is thread safe. If there is any other better thread safe data structure that I should use for this, let me know.
public class LatencyMetricHolder {
private final ConcurrentLinkedQueue<Long> latenciesHolder = new ConcurrentLinkedQueue<>();
private static class Holder {
private static final LatencyMetricHolder INSTANCE = new LatencyMetricHolder();
}
public static LatencyMetricHolder getInstance() {
return Holder.INSTANCE;
}
private LatencyMetricHolder() {}
public void addLatencies(long latency) {
latenciesHolder.add(latency);
}
public ConcurrentLinkedQueue<Long> getLatenciesHolder() {
return latenciesHolder;
}
}
I am calling addLatencies
method to populate the latencies from multithreaded code.
Now I want to have latenciesHolder
for each processId
which is a String. That means we can also get same processId
multiple times and sometimes it will be a new processId
, so somehow then I need to extract the latenciesHolder
queue for that processId
and add latency on that particular queue in a thread safe way and atomic way.
So I decided to use concurrent map for this as shown below where key will be processId
:
private final Map<String, ConcurrentLinkedQueue<Long>> latenciesHolderByProcessId = Maps.newConcurrentMap();
Since I am using a map, then I need to synchronize on the creation of new ConcurrentLinkedQueue
instances which is kind of tricky in Java 7 I guess as I am on Java 7.
What is the right way to populate this map in a atomic way from multiple threads without much contention and I want to use concurrent collections instead of conventional locking if there is any way?
Update:
public void add(String type, long latencyInMs) {
ConcurrentLinkedQueue<Long> latencyHolder = latenciesHolderByProcessId.get(type);
if (latencyHolder == null) {
latencyHolder = Queues.newConcurrentLinkedQueue();
ConcurrentLinkedQueue<Long> currentLatencyHolder =
latenciesHolderByProcessId.putIfAbsent(type, latencyHolder);
if (currentLatencyHolder != null) {
latencyHolder = currentLatencyHolder;
}
}
latencyHolder.add(latencyInMs);
}
ok, I'm not sure I get all the specs right, but this is one way to do it:
public class LatencyMetricHolder {
HashMap<Integer, ConcurrentLinkedQueue<Long>> map = new HashMap<>();
public void addLatency(int processID, long latency) {
synchronized(map) {
ConcurrentLinkedQueue<Long> q = map.get(processID);
if (q == null) {
q = new ConcurrentLinkedQueue<Long>()
map.put(processID, q);
}
}
q.add(latency)
}
public ConcurrentLinkedQueue<Long> getLatenciesHolder(int processID) {
synchronized(map) {
return map.get(processId);
}
}
}