Search code examples
javamultithreadingconditional-statementsreentrantlock

Java ReentrantLock and Condition | producers finish work, consumer gets stuck


General Information: Three reader-threads read randomly from a file in chunks where each chunk has an ID and they write to a normal ArrayList. A writer-thread writes to an outputfile as soon as a chunk with the needed ID is added to the list.

For that reason I have written a BlockingChunkList which should synchronize the add() and getNextChunk() methods.

It works for me with synchronized + notify + notifyAll in one case and with a synchronized list in another.

I don't manage to do it when I use ReentrantLock and Condition. The writer-thread only writes four chunks and then he gets stuck.

Why it might not work: I have the suspicion that once the readers are done the writer doesn't get the lock back. However i would expect that everytime when there is something to write (available=true) then the writer thread should be called. They seem to ignore hasAccess.await() when available is true.

How it should work: The reading threads only call the add method and they release the writing thread only when there is something to write (available). They also block themselves when available=true. This lock is released when the writer has written something by calling hasAccess.signalAll() The writing thread only calls the getNextChunk() method and he releases the other threads when he wrote the chunk. He blocks himself when available=false and he is released by the readers.

Question: The reading threads finish their work and the writing thread only writes the first 4 chunks. I expect that the writer is always called when available=true.

I don't need an exact solution, a hint is appreciated as well since I think I am missing something. So: What am I missing ?

Thank You

EDIT: Concurrency is handeled only in the posted class. The main-method only starts the treads. EDIT 2: This is one of my first shots at concurrency. I know that ArrayList is not thread safe. I would like to make it so by using ReentrantLock and Condition in order to understand the concepts. The BASIC idea is to block either reader or writer whether available is true or false.

import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

    public class BlockingChunkQueue {

        private final ArrayList<Chunk> chunks;
        private volatile int currentID = 0; // first ID for which the writer needs to wait
        private volatile boolean available; // true if the ID is in the list

        private final Lock listLock;
        private final Condition hasAccess;

        public BlockingChunkQueue(){ 
            chunks = new ArrayList<>(); 
            listLock = new ReentrantLock();
            hasAccess = listLock.newCondition();
            available = false;
        }

        /*
         * While there is a chunk which might be written to the output (=correct ID) then wait for access.
         * Then add the chunk and if the added chunk has the ID for which the other thread is waiting, then,
         * then available = true and signal the waiting thread.
         * 
         * 
         */
        public void add(Chunk chunk) throws InterruptedException{
            listLock.lock();
            try{
                while(available){
                    hasAccess.await(); // reader block yourself until you get the lock back
                }
                chunks.add(chunk);

            if(chunk.getId() == currentID){
                available = true;
                hasAccess.signalAll();
            }
            }finally{
                listLock.unlock();
            }
        }

        /*
         * If the chunk is not available then wait.
         * If it becomes available then increment the ID, remove it from the list, signal the reader-threads and set available false.
         * return the chunk which will be written to the output.
         */
        public Chunk getNextChunk() throws InterruptedException {
            listLock.lock();
            try{
                while(!available){
                    hasAccess.await(); // block yourself until u can write something
                }

                for(Chunk c : chunks){
                    if(c.getId() == currentID){
                        currentID++;
                        chunks.remove(c);
                        hasAccess.signalAll(); //signal the readers
                        available = false;
                        return c;
                    }
                }

            }finally{
                listLock.unlock();
            }
            return null;
        }

        public int getcurrentID(){ return currentID;}

        public boolean isEmpty(){ return chunks.isEmpty(); }

        public int size(){return chunks.size();}
    }

SOLUTION: There was nothing wrong with handling the threads. It turned out to be a logical error from my side. The writing thread gets stuck because he doesn't get the chance to check for the necessary IDs because the writers read the chunks randomly. Thanks for the helpfull answer.


Solution

  • There are a several of problems here.

    What is the purpose of the volatile variable available which is only read or mutated while the lock is held?

    The isEmtpy and size methods call methods on chunks without holding the lock. ArrayList is not thread-safe. The behavior of these calls cannot be predicted.

    A reason you might get stuck is if multiple chunks get added before getNextChunk is called.

    In your loop to find the "current" you set available to false, but it may actually already be in the list:

                for(Chunk c : chunks){
                    if(c.getId() == currentID){
                        currentID++;
                        chunks.remove(c);
                        hasAccess.signalAll(); //signal the readers
                        available = false;
                        return c;
                    }
                }
    

    Consider storing your chunks in a Map<Integer,Chunk> so that can easily see if a chunk is present by the identifier.