I'm using Java's DelayQueue to dispatch events after a second delay. But the problem is that, under heavy load, my consumers on DelayQueue block until considerable bulk of offer() operations from another threads are gone.
Does anyone know non-blocking delay queue implementations in Java?
Unfortunately, DelayQueue is blocking queue and it doesn't return immediately if it's intensely being written to because it uses a lock.
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
So if many threads write to it, as Stephen said, there is not much you can do about that.
I solved the problem by using ConcurrentSkipListSet with DelayedElement.
public class DelayedElement implements Comparable<DelayedElement> {
private final Long initTime;
private final String msgId;
public DelayedElement(Long initTime, String msgId) {
this.initTime = initTime;
this.msgId = msgId;
}
@Override
public int hashCode() {
int hash = 5;
hash = 29 * hash + Objects.hashCode(this.initTime);
hash = 29 * hash + Objects.hashCode(this.msgId);
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final DelayedElement other = (DelayedElement) obj;
if (!Objects.equals(this.initTime, other.initTime)) {
return false;
}
if (!Objects.equals(this.msgId, other.msgId)) {
return false;
}
return true;
}
@Override
public int compareTo(DelayedElement o) {
return -o.initTime.compareTo(initTime);
}
}
In my producers' threads, I add each element with a second's delay. In my consumer thread, I simply read elements which has a second's delay like:
long diff = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(1000L);
NavigableSet<DelayedElement> set = queue.headSet(
new DelayedElement(diff, "", null));
//further processing goes here
This way I achieve non-blocking nature and can safely write and read from the Collection at full throttle.