Search code examples
javamultithreadingjava-threads

Three Threads Comnunition and Synchronize


Background: I have three threads. ThreadA is responsible for writing elements to the queue, and if the queue is full, then notify ThreadC to read elements from the queue. ThreadB is another condition, if the queue is not full, but the time is beyond 5 seconds, then Thread notice threadC to take elements from the queue, last, ThreadC notice ThreadB to refresh its timestamp.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    private Lock lock = new ReentrantLock();
    private Condition conA = lock.newCondition();
    private Condition conB = lock.newCondition();
    private Condition conC = lock.newCondition();

    ArrayBlockingQueue<Integer> readQueueA = new ArrayBlockingQueue<>(3);

    public static void main(String[] args) {
        Main main1 = new Main();

        try {
            ThreadA threadWrite = main1.new ThreadA();
            Thread threadOut = new Thread(threadWrite);
            threadOut.start();

            ThreadB threadB = main1.new ThreadB();
            Thread threadBB = new Thread(threadB);
            threadBB.start();

            ThreadC threadRead = main1.new ThreadC();
            Thread threadIn = new Thread(threadRead);
            threadIn.start();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    class ThreadA implements Runnable {
        public void addElement(Integer i) {
            try {
                readQueueA.put(i);
            } catch (Exception ex ){
                ex.printStackTrace();
            }
        }

        @Override
        public void run() {
            int i = 0;
            while (i < 10) {
                lock.lock();
                try {
                    if (readQueueA.size() < 3) {
                        addElement(i++);
                    } else {
                        System.out.println("notice C: " + new Date());
                        conC.signal();
                        conA.await();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class ThreadB implements Runnable {
        @Override
        public void run() {
            reSetStartTime();
            while (true) {
                lock.lock();
                try {
                    if (!conB.await(5, TimeUnit.SECONDS)) {
                        System.out.println("Timeout Zzzzzzz: " + new Date());
                        conC.signal();
                    } else {
                        System.out.println("RefreshB。。。。" + new Date());
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class ThreadC implements Runnable {
        public void printQueue(ArrayBlockingQueue<Integer> printQueue) {
            int len = printQueue.size();
            System.out.println("Queen Size :" + printQueue.size());
            for (int i = 0; i < len; i++) {
                System.out.print(printQueue.poll());
            }
            System.out.println();
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    System.out.println("I'm thread C " + new Date());
                    conC.await();
                    System.out.println("I'm thread C, and I wake up " + new Date());
                    if (readQueueA.size() > 0) {
                        System.out.print("OUTPUT: ");
                        printQueue(readQueueA);
                        conA.signal();
                        conB.signal();
                    } else {
                        System.out.println("No elements");
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

Output:

"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" 
notice C: Fri Sep 21 10:07:35 CST 2018
I'm thread C Fri Sep 21 10:07:35 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
345
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
678
I'm thread C Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:45 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:45 CST 2018
OUTPUT: Queen Size :1
9
I'm thread C Fri Sep 21 10:07:45 CST 2018
RefreshB。。。。Fri Sep 21 10:07:45 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:50 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:50 CST 2018
No elements
....

But, the output is not what I expected, why ThreadB outPut TimeOut Zzz at first?I think there shoud not output this item.Because ThreadA wakeup ThreadC, The next step should exec ThreadC and Thread can run finished within in a short time not beyond 5s.Can anyone help me explain or fix it? Thank u!

Expected :

notice C: Fri Sep 21 10:28:31 CST 2018
I'm thread C Fri Sep 21 10:28:**31** CST 2018
I'm thread C, and I wake up Fri Sep 21 10:28:31 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:28:31 CST 2018
notice C: Fri Sep 21 10:28:31 CST 2018
RefreshB。。。。Fri Sep 21 10:28:31 CST 2018           
...

Solution

  • Here is what is happening in the specific scenario you posted: A starts, and runs the while loop till it signals C and waits. Then C starts and waits, and here the problem occurs; A signalled before C started waiting, so that signal call was lost, and now both A and C are waiting. So at this point, what is on the console is

    notify C ...
    I'm thread C...

    Now B starts, and waits the full 5 seconds because no other thread is available to signal it. As such, conB.await(5, TimeUnit.SECONDS) returns false, it prints Timeout Zzzzzzz:, and then signals C. And that's why things seem out of order, a signal only has effect when another thread is already waiting!

    To fix this, try changing

    conC.await();
    System.out.println("I'm thread C, and I wake up " + new Date());
    

    to

    if (readQueueA.size() < 3) {
        conC.await();
        System.out.println("I'm thread C, and I wake up " + new Date());
    }
    

    This way, C won't wait if the condition it is waiting for has already been fulfilled.