Search code examples
javamultithreadingpriority-queuejava.util.concurrentdelayed-execution

Why calling take() method Java DelayQueue doesn't block the whole data structure for all threads?


I'm trying to figure out, how the java.util.concurrent.DelayQueue works in the multithreaded environment. I see that this data structure uses ReentrantLock internally, and it is acquired at the beginning of each method of this class. I tried to implement a producer-consumer pattern using different methods of retrieving the top of element from the queue (poll() / take()).

public class DelayQueueTest {
    static DelayQueue<Delayed> delayQueue = new DelayQueue<>();
    public static final long executionTime = 2_000L;
    public static long timeStart;

    public static void go() throws InterruptedException {
        timeStart = System.currentTimeMillis();
        Thread consumer1 = new Thread(new Consumer());
        Thread producer = new Thread(new Producer());


        consumer1.start();
        Thread.sleep(200);
        producer.start();

        producer.join();
        consumer1.join();
    }

    static class Consumer implements Runnable {
        static DelayQueue<Delayed> delayQueue = DelayQueueTest.delayQueue;

        @Override
        public void run() {
            while (System.currentTimeMillis() - timeStart < executionTime) {
                System.out.println(String.format(
                        "Thread %s started taking at %d",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart)
                );
                Delayed result;
                try {
                    result = delayQueue.take();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.print(String.format(
                        "Thread %s finished taking at %d... ",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart)
                );

                if (result != null) System.out.println(String.format("with result %s", result.toString()));
                else System.out.println();
                System.out.format("Queue size: %d\n", delayQueue.size());
                try {
                    Thread.sleep(500);
                }
                catch (InterruptedException ex) {
                    System.out.println(ex);
                }
            }
        }
    }

    static class Producer implements Runnable {
        static DelayQueue<Delayed> delayQueue = DelayQueueTest.delayQueue;

        @Override
        public void run() {
            while (System.currentTimeMillis() - timeStart < executionTime) {
                System.out.println(String.format(
                        "Thread %s started offering at %d. Queue size: %d",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart,
                        delayQueue.size())
                );
                delayQueue.offer(new DelayObject(300));
                System.out.println(String.format(
                        "Thread %s finished offering at %d. Queue size: %d",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart,
                        delayQueue.size())
                );
                try {
                    Thread.sleep(300);
                }
                catch (InterruptedException ex) {
                    System.out.println(ex);
                }
            }
        }
    }
}

The question appeared when I called take() on consumer thread, and called offer() on the producer thread 200 ms later. I was expecting consumer thread to block the whole data structure as take() acquires the lock, so offer() was not expected to add an element into the queue (because offer() also wants to acquire the lock but it's locked by offer()). So, globally I expecting the deadlock. However, it didn't happen. Here's the output of my program:

Thread 15 started taking at 2
Thread 16 started offering at 214. Queue size: 0
Thread 16 finished offering at 217. Queue size: 1
Thread 16 started offering at 527. Queue size: 1
Thread 16 finished offering at 527. Queue size: 1
Thread 15 finished taking at 527... with result org.example.DelayQueueTest$DelayObject@7148b5a8
Queue size: 1
Thread 16 started offering at 828. Queue size: 1
Thread 16 finished offering at 828. Queue size: 2
Thread 15 started taking at 1035
Thread 15 finished taking at 1035... with result org.example.DelayQueueTest$DelayObject@21819116
Queue size: 1
Thread 16 started offering at 1143. Queue size: 1
Thread 16 finished offering at 1143. Queue size: 2
Thread 16 started offering at 1453. Queue size: 2
Thread 16 finished offering at 1453. Queue size: 3
Thread 15 started taking at 1548
Thread 15 finished taking at 1548... with result org.example.DelayQueueTest$DelayObject@6bf427cb
Queue size: 2
Thread 16 started offering at 1769. Queue size: 2
Thread 16 finished offering at 1769. Queue size: 3

Can you explain me, why do I have such output? Why the offer() was successfully executed if it required to lock the lock, which had been already locked?

Also, if you may come up with potential use cases of this DelayQueue class, I would be glad to observe them...


Solution

  • As you rightfully noticed DelayQueue.take() aquires the lock associated with the DelayQueue and this prevents other threads from interacting with the DelayQueue.

    However, if no element is available, take() calls available.await(); (see the source of DelayQueue.take() lines 242 and 249).

    available is associated with the DelayQueues lock (source lines 103 and 129):

    private final transient ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
    

    Now if you look up the documentation of ReentrantLock.newCondition()

    Returns a Condition instance for use with this Lock instance. The returned Condition instance supports the same usages as do the Object monitor methods (wait, notify, and notifyAll) when used with the built-in monitor lock.

    • [...]
    • When the condition waiting methods are called the lock is released and, before they return, the lock is reacquired and the lock hold count restored to what it was when the method was called.
    • [...]

    That means that upon calling available.await() the reentrant lock is released which allows other threads to enter the locked methods of the DelayQueue.