Search code examples
javamultithreadingthreadpoolexecutorserviceblockingqueue

Deadlock while add() and take() elements to BlockingQueue


I'm playing with java.util.concurrent package and using its interface/classes to know how they work. I created a BlockingQueue instance (ArrayBlockingQueue imlpementation). And created 50 consumers and 50 producers of Rannable type. Then with Executors.newFixedThreadPool(4) created a thread pool of size 4 and submitted all of my consumers/producers to the threadPool (ExecutorService). But eventually printing the process I figured that it's deadlock-ing. Can anyone please explain why a thread safe queue gets deadlocked!? Below is my code:

Consumer:

public class ArrayBlockingQueueConsumer implements Runnable{

    BlockingQueue<Integer> blockingQueue;
    int consumerNumber = 0;

    public ArrayBlockingQueueConsumer(BlockingQueue<Integer> blockingQueue, int consumerNumber) {
        this.blockingQueue = blockingQueue;
        this.consumerNumber = consumerNumber;
    }

    public void run() {
        int i = 0;
        while(i<60) {
            System.out.printf("Consumer %d going take %d from blocking queue\n", consumerNumber, i);
            try {
                int x = blockingQueue.take();
                System.out.println("The number " + x + "is taken from the queue.");
            } catch (InterruptedException e) {
                System.out.printf("Consumer %d interrupted while adding %d to blocking queue\n", consumerNumber, i);
                e.printStackTrace();
            }
            i++;
        }
    }
}

Producer:

public class ArrayBlockingQueueProducer implements Runnable{

    BlockingQueue<Integer> blockingQueue;
    int producerNumber = 0;

    public ArrayBlockingQueueProducer(BlockingQueue<Integer> blockingQueue, int producerNumber) {
        this.blockingQueue = blockingQueue;
        this.producerNumber = producerNumber;
    }

    public void run() {
        int i = 0;
        while(i<60) {
            System.out.printf("Consumer %d going to add %d to blocking queue\n", producerNumber, i);
            blockingQueue.add(i);
            System.out.printf("Consumer %d added %d to blocking queue\n", producerNumber, i);
            i++;
        }
    }

}

Executor Class (main() method class):

public class BlockingQueueExecutor {

    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(50);

        ArrayBlockingQueueConsumer[] consumers = new ArrayBlockingQueueConsumer[200];
        ArrayBlockingQueueProducer[] producers = new ArrayBlockingQueueProducer[200];

        System.out.println("Hello hello :D");

        for (int i = 0; i < 200; i++) {
            consumers[i] = new ArrayBlockingQueueConsumer(blockingQueue, i+1);
            producers[i] = new ArrayBlockingQueueProducer(blockingQueue, i+1);
        }

        ExecutorService threadPool = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 200; i++) {
            threadPool.execute(consumers[i]);
            threadPool.execute(producers[i]);
        }

        threadPool.shutdown();
    }
}

I ran the loops in consumers/producers from 0 to 60 so that they can throw exception when they don't find any element or find queue full respectively, ironically none of the producers/consumers threw any exception.


Solution

  • I ran the loops in consumers/producers from 0 to 60 so that they can throw exception when they don't find any element or find queue full respectively, ironically none of the producers/consumers threw any exception.

    take() doesn't throw an exception when the queue is empty. It waits until an element becomes available.

    E java.util.concurrent.BlockingQueue.take() throws InterruptedException

    Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.