Search code examples
javaconcurrencywaitnotifyjava.util.concurrent

ConcurrentLinkedQueue with wait() and notify()


I am not well-versed in Multi-Threading. I am trying to take screenshot repeatedly by one producer thread, which adds the BufferedImage object to ConcurrentLinkedQueue and a Consumer Thread will poll queue for BufferedImage object to saving them in file. I could consume them by repeated polling(while loop), but I don't know how to consume them using notify() and wait(). I have tried using wait() and notify in smaller programs, but couldn't implement it here.

I have the following code:

class StartPeriodicTask implements Runnable {
    public synchronized void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        if(null!=queue.peek()){
            try {
                System.out.println("Empty queue, so waiting....");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            queue.add(image);
            notify();
        }
    }
}

public class ImageConsumer implements Runnable {
        @Override
        public synchronized void run() {
            while (true) {
                BufferedImage bufferedImage = null;
                if(null==queue.peek()){
                    try {
                        //Empty queue, so waiting....
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else{
                    bufferedImage = queue.poll();
                    notify();
                }
                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
            }

Previously I had a repeated polling to check for existence of BufferedImage Object. Now I have changed run method as synchronised and tried to implement wait() and notify(). Am I doing correct? Please help. Thanks.


Solution

  • You are using the wrong Queue for the job. The ConcurrentLinkedQueue is a non-blocking Queue which means that there is no producer consumer semantics. If you are just doing one reader and one writer take a look at SynchronousQueue

    Simply put your code can be re-written as such

    BlockingQueue<?> queue = new SynchrnousQueue<?>();
    class StartPeriodicTask implements Runnable {
        public void run() {
            Robot robot = null;
            try {
                robot = new Robot();
            } catch (AWTException e1) {
                e1.printStackTrace();
            }
            Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                    .getScreenSize());
            BufferedImage image = robot.createScreenCapture(screenRect);
            queue.offer(image); //1
    }
    public class ImageConsumer implements Runnable {
            @Override
            public void run() {
                while (true) {
                    BufferedImage bufferedImage = queue.poll(); //2
    
                    File imageFile = getFile();
                    if (!imageFile.getParentFile().exists()) {
                        imageFile.getParentFile().mkdirs();
                    }
                        try {
                            ImageIO.write(bufferedImage, extension, imageFile);
                            //Image saved
                        catch (IOException e) {
                            tracer.severe("IOException occurred. Image is not saved to file!");
                        }
                }
    

    That's really it.

    Let me explain. At line //1 the producing thread will 'place' the image on the queue. I quotes place because a SynchrnousQueue has no depth. What actually happens is the thread tells the queue "If there are any threads asking for an element from this queue then give it the that thread and let me continue. If not I'll wait until another thread is ready"

    Line //2 is similar to 1 where the consuming thread just waits until a thread is offering. This works great with a single-reader single-writer