Search code examples
javaconcurrencydelayed-jobdelayed-execution

Nonblocking DelayQueue, Java


I'm using Java's DelayQueue to dispatch events after a second delay. But the problem is that, under heavy load, my consumers on DelayQueue block until considerable bulk of offer() operations from another threads are gone.

Does anyone know non-blocking delay queue implementations in Java?


Solution

  • Unfortunately, DelayQueue is blocking queue and it doesn't return immediately if it's intensely being written to because it uses a lock.

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
    

    So if many threads write to it, as Stephen said, there is not much you can do about that.

    I solved the problem by using ConcurrentSkipListSet with DelayedElement.

    public class DelayedElement implements Comparable<DelayedElement> {
    
    private final Long initTime;
    private final String msgId;
    
    public DelayedElement(Long initTime, String msgId) {
        this.initTime = initTime;
        this.msgId = msgId;
    }             
    
    @Override
    public int hashCode() {
        int hash = 5;
        hash = 29 * hash + Objects.hashCode(this.initTime);
        hash = 29 * hash + Objects.hashCode(this.msgId);
        return hash;
    }
    
    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final DelayedElement other = (DelayedElement) obj;
        if (!Objects.equals(this.initTime, other.initTime)) {
            return false;
        }
        if (!Objects.equals(this.msgId, other.msgId)) {
            return false;
        }
        return true;
    }
    @Override
    public int compareTo(DelayedElement o) {
        return -o.initTime.compareTo(initTime);
        }
    }
    

    In my producers' threads, I add each element with a second's delay. In my consumer thread, I simply read elements which has a second's delay like:

     long diff = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(1000L);
     NavigableSet<DelayedElement> set = queue.headSet(
     new DelayedElement(diff, "", null));
     //further processing goes here     
    

    This way I achieve non-blocking nature and can safely write and read from the Collection at full throttle.