Search code examples
javamultithreadingwaitnotify

Interthread communication between producer and consumer threads?


I am trying to learn inter thread communication where I am using BlockingQueue.

I have written a producer which generate TaskId and insert it into BlockingQueue.

Now I have 2 consumers threads (name "1" and "0"). If taskId is odd, it is consumed by Thread "1" else "2".

@Override
    public void run() {
        while (true) {

                while (queue.peek() != null && !name.equals(String.valueOf(queue.peek().intValue() % 2 ))) {

                try {
                    System.out.println(name + ",consumed," + queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();

                }
            }
        }
    }

How can i make that check also here?


Solution

  • One way I am thinking, there could be other better ways also:

    @Override
        public void run() {
            String name = Thread.currentThread().getName();
            while (true) {
    
                while (queue.peek() == null) {
                    //some sleep time
                }
    
                synchronized (lock) {
                    while (queue.peek() != null && !name.equals(String.valueOf(queue.peek().intValue() % 2 ))) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if(queue.peek() != null) {
                        try {
                            System.out.println(name + ",consumed," + queue.take());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    lock.notify();
                }
            }
        }
    

    Another Way: To have anotherLock that will be notified by producer thread whenever element is added to queue.

    @Override
        public void run() {
            String name = Thread.currentThread().getName();
            while (true) {
    
                synchronized (anotherLock) {
                    while (queue.peek() == null) {
                        anotherLock.wait();
                    }
                }
    
                synchronized (lock) {
                    while (queue.peek() != null && !name.equals(String.valueOf(queue.peek().intValue() % 2 ))) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if(queue.peek() != null) {
                        try {
                            System.out.println(name + ",consumed," + queue.take());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    lock.notify();
                }
            }
        }