Search code examples
javamultithreadingblockingqueue

Java Producer Consumer ArrayBlockingQueue deadlock on take()


In my app there are 2 phases, one download some big data, and the other manipulates it. so i created 2 classes which implements runnable: ImageDownloader and ImageManipulator, and they share a downloadedBlockingQueue:

        public class ImageDownloader implements Runnable {

        private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
        private ArrayBlockingQueue<String> imgUrlsBlockingQueue;

        public ImageDownloader(ArrayBlockingQueue<String> imgUrlsBlockingQueue, ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue) {

            this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
            this.imgUrlsBlockingQueue = imgUrlsBlockingQueue;

        }

        @Override
        public void run() {
            while (!this.imgUrlsBlockingQueue.isEmpty()) {
                try {
                    String imgUrl = this.imgUrlsBlockingQueue.take();
                    ImageBean imageBean = doYourThing(imgUrl);
                    this.downloadedImagesBlockingQueue.add(imageBean);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

        public class ImageManipulator implements Runnable {

        private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
        private AtomicInteger capacity;

        public ImageManipulator(ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
                                AtomicInteger capacity) {
            this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
            this.capacity = capacity;
        }

        @Override
        public void run() {
            while (capacity.get() > 0) {
                try {
                    ImageBean imageBean = downloadedImagesBlockingQueue.take(); // <- HERE I GET THE DEADLOCK
                    capacity.decrementAndGet();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // ....
            }
        }
    }




    public class Main {
        public static void main(String[] args) {
            String[] imageUrls = new String[]{"url1", "url2"};
            int capacity = imageUrls.length;

            ArrayBlockingQueue<String> imgUrlsBlockingQueue = initImgUrlsBlockingQueue(imageUrls, capacity);
            ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue = new ArrayBlockingQueue<>(capacity);

            ExecutorService downloaderExecutor = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 3; i++) {
                Runnable worker = new ImageDownloader(imgUrlsBlockingQueue, downloadedImagesBlockingQueue);
                downloaderExecutor.execute(worker);
            }
            downloaderExecutor.shutdown();

            ExecutorService manipulatorExecutor = Executors.newFixedThreadPool(3);
            AtomicInteger manipulatorCapacity = new AtomicInteger(capacity);

            for (int i = 0; i < 3; i++) {
                Runnable worker = new ImageManipulator(downloadedImagesBlockingQueue, manipulatorCapacity);
                manipulatorExecutor.execute(worker);
            }
            manipulatorExecutor.shutdown();
            while (!downloaderExecutor.isTerminated() && !manipulatorExecutor.isTerminated()) {
            }
        }
    }

The deadlock happens because this scenario: t1 checks capacity its 1.

t2 checks its 1.

t3 checks its 1.

t2 takes, sets capacity to 0, continue with flow and eventually exits. t1 and t3 now on deadlock, cause there will be no adding to the downloadedImagesBlockingQueue.

Eventually i want something like that: when the capacity is reached && the queue is empty = break the "while" loop, and terminate gracefully.

to set "is queue empty" as only condition won't work, cause in the start it is empty, until some ImageDownloader puts a imageBean into the queue.


Solution

  • Well i used some of the features suggested, but this is the complete solution for me, the one which does not busy waiting and wait until the Downloader notify it.

    public ImageManipulator(LinkedBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
                            LinkedBlockingQueue<ImageBean> manipulatedImagesBlockingQueue,
                            AtomicInteger capacity,
                            ManipulatedData manipulatedData,
                            ReentrantLock downloaderReentrantLock,
                            ReentrantLock manipulatorReentrantLock,
                            Condition downloaderNotFull,
                            Condition manipulatorNotFull) {
    
        this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
        this.manipulatedImagesBlockingQueue = manipulatedImagesBlockingQueue;
        this.capacity = capacity;
        this.downloaderReentrantLock = downloaderReentrantLock;
        this.manipulatorReentrantLock = manipulatorReentrantLock;
        this.downloaderNotFull = downloaderNotFull;
        this.manipulatorNotFull = manipulatorNotFull;
        this.manipulatedData = manipulatedData;
    }
    
    @Override
    public void run() {
        while (capacity.get() > 0) {
            downloaderReentrantLock.lock();
            if (capacity.get() > 0) { //checks if the value is updated.
    
                ImageBean imageBean = downloadedImagesBlockingQueue.poll();
    
                if (imageBean != null) { // will be null if no downloader finished is work (successfully downloaded or not)
    
                    capacity.decrementAndGet();
                    if (capacity.get() == 0) { //signal all the manipulators to wake up and stop waiting for downloaded images.
                        downloaderNotFull.signalAll();
                    }
                    downloaderReentrantLock.unlock();
    
                    if (imageBean.getOriginalImage() != null) { // the downloader will set it null iff it failes to download it.
    
                         // business logic
                    }
    
                    manipulatedImagesBlockingQueue.add(imageBean);
    
                    signalAllPersisters(); // signal the persisters (which has the same lock/unlock as this manipulator.
    
                } else {
                    try {
                        downloaderNotFull.await(); //manipulator will wait for downloaded image - downloader will signalAllManipulators (same as signalAllPersisters() here) when an imageBean will be inserted to queue.
                        downloaderReentrantLock.unlock();
                    } catch (InterruptedException e) {
                        logger.log(Level.ERROR, e.getMessage(), e);
                    }
                }
            }
        }
    
        logger.log(Level.INFO, "Manipulator: " + Thread.currentThread().getId() + "  Ended Gracefully");
    }
    
    private void signalAllPersisters() {
        manipulatorReentrantLock.lock();
        manipulatorNotFull.signalAll();
        manipulatorReentrantLock.unlock();
    }
    

    For full flow you can check this project on my github: https://github.com/roy-key/image-service/